You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hz...@apache.org on 2022/11/26 13:42:50 UTC
[rocketmq] branch dledger-controller-snapshot updated: [ISSUE #5585] Upgrade dledger version (#5586)
This is an automated email from the ASF dual-hosted git repository.
hzh0425 pushed a commit to branch dledger-controller-snapshot
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/dledger-controller-snapshot by this push:
new 1ba356572 [ISSUE #5585] Upgrade dledger version (#5586)
1ba356572 is described below
commit 1ba356572f3e1c573ab3719e0fc3799836e946cd
Author: hzh0425 <64...@qq.com>
AuthorDate: Sat Nov 26 21:42:31 2022 +0800
[ISSUE #5585] Upgrade dledger version (#5586)
* upgrade dledger version
---
.../broker/controller/ReplicasManager.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 4 +-
.../processor/AbstractSendMessageProcessor.java | 57 +++++++++-------------
.../broker/processor/ReplyMessageProcessor.java | 4 +-
.../broker/processor/SendMessageProcessor.java | 12 ++---
.../broker/controller/ReplicasManagerTest.java | 2 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 15 ++++++
.../java/org/apache/rocketmq/common/UtilAll.java | 36 ++++++++------
.../controller/BrokerHeartbeatManager.java | 2 +-
.../apache/rocketmq/controller/BrokerLiveInfo.java | 16 +++++-
.../controller/elect/impl/DefaultElectPolicy.java | 12 +++--
.../impl/DLedgerControllerStateMachine.java | 12 +++--
.../impl/DefaultBrokerHeartbeatManager.java | 7 ++-
.../processor/ControllerRequestProcessor.java | 3 +-
.../impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../impl/manager/ReplicasInfoManagerTest.java | 41 ++++++++++++----
pom.xml | 2 +-
.../RegisterBrokerToControllerRequestHeader.java | 23 +++++++--
.../offset/ResetOffsetByTimeOldCommand.java | 13 +++--
19 files changed, 168 insertions(+), 97 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5eceab0bb..a6589d2ea 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -292,7 +292,7 @@ public class ReplicasManager {
try {
final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
- this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset());
+ this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
final String newMasterAddress = registerResponse.getMasterAddress();
if (StringUtils.isNoneEmpty(newMasterAddress)) {
if (StringUtils.equals(newMasterAddress, this.localAddress)) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index f3ffc9293..a6853350e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -1161,9 +1161,9 @@ public class BrokerOuterAPI {
*/
public RegisterBrokerToControllerResponseHeader registerBrokerToController(
final String controllerAddress, final String clusterName,
- final String brokerName, final String address, final int epoch, final long maxOffset) throws Exception {
+ final String brokerName, final String address, final int epoch, final long maxOffset, final int electionPriority) throws Exception {
- final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset);
+ final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset, electionPriority);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index fe7040863..d87b765b6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -345,20 +345,25 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
- SendMessageRequestHeader requestHeader) {
+ SendMessageRequestHeader requestHeader, RemotingCommand request) {
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
- SendMessageContext traceContext;
- traceContext = new SendMessageContext();
- traceContext.setNamespace(namespace);
- traceContext.setProducerGroup(requestHeader.getProducerGroup());
- traceContext.setTopic(requestHeader.getTopic());
- traceContext.setMsgProps(requestHeader.getProperties());
- traceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- traceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
- traceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
- traceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
- traceContext.setRequestTimeStamp(System.currentTimeMillis());
+ SendMessageContext sendMessageContext;
+ sendMessageContext = new SendMessageContext();
+ sendMessageContext.setNamespace(namespace);
+ sendMessageContext.setProducerGroup(requestHeader.getProducerGroup());
+ sendMessageContext.setTopic(requestHeader.getTopic());
+ sendMessageContext.setBodyLength(request.getBody().length);
+ sendMessageContext.setMsgProps(requestHeader.getProperties());
+ sendMessageContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ sendMessageContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+ sendMessageContext.setQueueId(requestHeader.getQueueId());
+ sendMessageContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+ sendMessageContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+ sendMessageContext.setRequestTimeStamp(System.currentTimeMillis());
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ sendMessageContext.setCommercialOwner(owner);
Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
@@ -369,14 +374,14 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
if (uniqueKey == null) {
uniqueKey = "";
}
- traceContext.setMsgUniqueKey(uniqueKey);
+ sendMessageContext.setMsgUniqueKey(uniqueKey);
if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
- traceContext.setMsgType(MessageType.Order_Msg);
+ sendMessageContext.setMsgType(MessageType.Order_Msg);
} else {
- traceContext.setMsgType(MessageType.Normal_Msg);
+ sendMessageContext.setMsgType(MessageType.Normal_Msg);
}
- return traceContext;
+ return sendMessageContext;
}
public boolean hasSendMessageHook() {
@@ -532,29 +537,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
}
- public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
- SendMessageContext context) {
+ public void executeSendMessageHookBefore(SendMessageContext context) {
if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
- final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
-
- String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
- if (null != requestHeader) {
- context.setNamespace(namespace);
- context.setProducerGroup(requestHeader.getProducerGroup());
- context.setTopic(requestHeader.getTopic());
- context.setBodyLength(request.getBody().length);
- context.setMsgProps(requestHeader.getProperties());
- context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- context.setBrokerAddr(this.brokerController.getBrokerAddr());
- context.setQueueId(requestHeader.getQueueId());
- }
-
hook.sendMessageBefore(context);
- if (requestHeader != null) {
- requestHeader.setProperties(context.getMsgProps());
- }
} catch (AbortProcessException e) {
throw e;
} catch (Throwable e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 35468ab31..dbc87a870 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -70,8 +70,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
return null;
}
- mqtraceContext = buildMsgContext(ctx, requestHeader);
- this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+ mqtraceContext = buildMsgContext(ctx, requestHeader, request);
+ this.executeSendMessageHookBefore(mqtraceContext);
RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3bf1f31a2..14095f9ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -81,7 +81,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
- SendMessageContext traceContext;
+ SendMessageContext sendMessageContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
@@ -95,11 +95,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (rewriteResult != null) {
return rewriteResult;
}
- traceContext = buildMsgContext(ctx, requestHeader);
- String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
- traceContext.setCommercialOwner(owner);
+ sendMessageContext = buildMsgContext(ctx, requestHeader, request);
try {
- this.executeSendMessageHookBefore(ctx, request, traceContext);
+ this.executeSendMessageHookBefore(sendMessageContext);
} catch (AbortProcessException e) {
final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
errorResponse.setOpaque(request.getOpaque());
@@ -108,10 +106,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
RemotingCommand response;
if (requestHeader.isBatch()) {
- response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
+ response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
} else {
- response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
+ response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 9c08d8e67..84e578db5 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,7 +123,7 @@ public class ReplicasManagerTest {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
- when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong())).thenReturn(registerBrokerToControllerResponseHeader);
+ when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 92f28d653..ec5986818 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -336,6 +336,13 @@ public class BrokerConfig extends BrokerIdentity {
private long syncControllerMetadataPeriod = 10 * 1000;
+ /**
+ * It is an important basis for the controller to choose the broker master.
+ * The lower the value of brokerElectionPriority, the higher the priority of the broker being selected as the master.
+ * You can set a lower priority for the broker with better machine conditions.
+ */
+ private int brokerElectionPriority = Integer.MAX_VALUE;
+
public enum MetricsExporterType {
DISABLE(0),
OTLP_GRPC(1),
@@ -1422,6 +1429,14 @@ public class BrokerConfig extends BrokerIdentity {
this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
}
+ public int getBrokerElectionPriority() {
+ return brokerElectionPriority;
+ }
+
+ public void setBrokerElectionPriority(int brokerElectionPriority) {
+ this.brokerElectionPriority = brokerElectionPriority;
+ }
+
public boolean isRecoverConcurrently() {
return recoverConcurrently;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 451a6b849..8fcab1ddf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -40,6 +40,7 @@ import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -56,19 +57,28 @@ public class UtilAll {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final Logger STORE_LOG = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
- final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
- final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName(); // format: "pid@hostname"
+ private final static char[] HEX_ARRAY;
+ private final static int PID;
+
+ static {
+ HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+ Supplier<Integer> supplier = () -> {
+ // format: "pid@hostname"
+ String currentJVM = ManagementFactory.getRuntimeMXBean().getName();
+ try {
+ return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));
+ } catch (Exception e) {
+ return -1;
+ }
+ };
+ PID = supplier.get();
+ }
public static int getPid() {
- try {
- return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
- } catch (Exception e) {
- return -1;
- }
+ return PID;
}
public static void sleep(long sleepMs) {
@@ -213,7 +223,7 @@ public class UtilAll {
File file = new File(path);
if (!file.exists())
return -1;
- return file.getTotalSpace();
+ return file.getTotalSpace();
} catch (Exception e) {
return -1;
}
@@ -230,7 +240,6 @@ public class UtilAll {
return -1;
}
-
try {
File file = new File(path);
@@ -268,12 +277,11 @@ public class UtilAll {
try {
File file = new File(path);
-
if (!file.exists()) {
return -1;
}
- return file.getTotalSpace() - file.getFreeSpace() + file.getUsableSpace();
+ return file.getTotalSpace() - file.getFreeSpace() + file.getUsableSpace();
} catch (Exception e) {
return -1;
}
@@ -565,7 +573,7 @@ public class UtilAll {
return null;
}
return new StringBuilder().append(ip[0] & 0xFF).append(".").append(
- ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
+ ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
.append(".").append(ip[3] & 0xFF).toString();
}
@@ -752,7 +760,7 @@ public class UtilAll {
}
}
- private static void createDirIfNotExist(String dirName) {
+ private static void createDirIfNotExist(String dirName) {
File f = new File(dirName);
if (!f.exists()) {
boolean result = f.mkdirs();
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 364b32647..fd41aa21a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -49,7 +49,7 @@ public interface BrokerHeartbeatManager {
* Register new broker to heartManager.
*/
void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
- final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset);
+ final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);
/**
* Broker channel close
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index e88b26c39..faaf298d2 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -31,9 +31,10 @@ public class BrokerLiveInfo {
private int epoch;
private long maxOffset;
private long confirmOffset;
+ private Integer electionPriority;
public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset) {
+ Channel channel, int epoch, long maxOffset, Integer electionPriority) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -41,10 +42,12 @@ public class BrokerLiveInfo {
this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
this.channel = channel;
this.epoch = epoch;
+ this.electionPriority = electionPriority;
this.maxOffset = maxOffset;
}
+
public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset, long confirmOffset) {
+ Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -53,6 +56,7 @@ public class BrokerLiveInfo {
this.channel = channel;
this.epoch = epoch;
this.maxOffset = maxOffset;
+ this.electionPriority = electionPriority;
this.confirmOffset = confirmOffset;
}
@@ -123,6 +127,14 @@ public class BrokerLiveInfo {
this.confirmOffset = confirmOffset;
}
+ public void setElectionPriority(Integer electionPriority) {
+ this.electionPriority = electionPriority;
+ }
+
+ public Integer getElectionPriority() {
+ return electionPriority;
+ }
+
public long getConfirmOffset() {
return confirmOffset;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
index 7af029b98..00cac1627 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -36,8 +36,14 @@ public class DefaultElectPolicy implements ElectPolicy {
// <clusterName, brokerAddr, BrokerLiveInfo>, Used to obtain the BrokerLiveInfo information of a broker
private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
- private final Comparator<BrokerLiveInfo> comparator = (x, y) -> {
- return x.getEpoch() == y.getEpoch() ? (int) (y.getMaxOffset() - x.getMaxOffset()) : y.getEpoch() - x.getEpoch();
+ // Sort in descending order according to<epoch, offset>, and sort in ascending order according to priority
+ private final Comparator<BrokerLiveInfo> comparator = (o1, o2) -> {
+ if (o1.getEpoch() == o2.getEpoch()) {
+ return o1.getMaxOffset() == o2.getMaxOffset() ? o1.getElectionPriority() - o2.getElectionPriority() :
+ (int) (o2.getMaxOffset() - o1.getMaxOffset());
+ } else {
+ return o2.getEpoch() - o1.getEpoch();
+ }
};
public DefaultElectPolicy(BiPredicate<String, String> validPredicate, BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
@@ -55,7 +61,7 @@ public class DefaultElectPolicy implements ElectPolicy {
* - Filter alive brokers by 'validPredicate'.
* - Check whether the old master is still valid.
* - If preferBrokerAddr is not empty and valid, select it as master.
- * - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset), and select the best candidate as the new master.
+ * - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset, electionPriority), and select the best candidate as the new master.
*
* @param clusterName the brokerGroup belongs
* @param syncStateBrokers all broker replicas in syncStateSet
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
index dde94e998..4f1408b37 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.controller.impl;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
-import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
@@ -30,7 +30,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
- * The state machine implementation of the dledger controller
+ * The state machine implementation of the DLedger controller
*/
public class DLedgerControllerStateMachine implements StateMachine {
private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -61,7 +61,8 @@ public class DLedgerControllerStateMachine implements StateMachine {
}
@Override
- public void onSnapshotSave(SnapshotWriter writer, CompletableFuture<Boolean> future) {
+ public boolean onSnapshotSave(SnapshotWriter writer) {
+ return false;
}
@Override
@@ -74,6 +75,11 @@ public class DLedgerControllerStateMachine implements StateMachine {
public void onShutdown() {
}
+ @Override
+ public void onError(DLedgerException e) {
+
+ }
+
@Override
public String getBindDLedgerId() {
return dLedgerId;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 94a43fa26..eabae152b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -101,7 +101,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
@Override
public void registerBroker(String clusterName, String brokerName, String brokerAddr,
- long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) {
+ long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, final Integer electionPriority) {
final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
new BrokerLiveInfo(brokerName,
@@ -109,7 +109,10 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
brokerId,
System.currentTimeMillis(),
timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset));
+ channel,
+ epoch == null ? -1 : epoch,
+ maxOffset == null ? -1 : maxOffset,
+ electionPriority == null ? Integer.MAX_VALUE : electionPriority));
if (prevBrokerLiveInfo == null) {
log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 04caec015..4cbc1140e 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -114,7 +114,8 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(), controllerRequest.getEpoch(), controllerRequest.getMaxOffset());
+ responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+ controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getElectionPriority());
}
return response;
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index b8a151ea0..dd0c60a65 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -43,7 +43,7 @@ public class DefaultBrokerHeartbeatManagerTest {
this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
latch.countDown();
});
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L, 0);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
this.heartbeatManager.shutdown();
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 0b7faafc0..270f98089 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -126,29 +126,38 @@ public class ReplicasInfoManagerTest {
public void mockHeartbeatDataMasterStillAlive() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
}
public void mockHeartbeatDataHigherEpoch() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 0, 3L);
+ 0, 3L, 0);
}
public void mockHeartbeatDataHigherOffset() {
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L);
+ 1, 3L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L);
+ 1, 2L, 0);
this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L);
+ 1, 3L, 0);
+ }
+
+ public void mockHeartbeatDataHigherPriority() {
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L, 3);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 3L, 2);
+ this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L, 1);
}
@Test
@@ -190,6 +199,20 @@ public class ReplicasInfoManagerTest {
assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
}
+ @Test
+ public void testElectMasterPreferHigherPriorityWhenEpochAndOffsetEquals() {
+ mockMetaData();
+ final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+ ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
+ mockHeartbeatDataHigherPriority();
+ final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
+ electPolicy);
+ final ElectMasterResponseHeader response = cResult.getResponse();
+ assertEquals(response.getMasterEpoch(), 2);
+ assertFalse(response.getNewMasterAddress().isEmpty());
+ assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+ }
+
@Test
public void testElectMaster() {
mockMetaData();
diff --git a/pom.xml b/pom.xml
index 197cea783..cff2d293c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<lz4-java.version>1.8.0</lz4-java.version>
<opentracing.version>0.33.0</opentracing.version>
<jaeger.version>1.6.0</jaeger.version>
- <dleger.version>0.3.1</dleger.version>
+ <dleger.version>0.3.2</dleger.version>
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
index 81ed03d48..a8e745e4f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
@@ -30,23 +30,27 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
private Long maxOffset;
@CFNullable
private Long heartbeatTimeoutMillis;
-
+ @CFNullable
+ private Integer electionPriority;
public RegisterBrokerToControllerRequestHeader() {
}
public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress) {
- this.clusterName = clusterName;
- this.brokerName = brokerName;
- this.brokerAddress = brokerAddress;
+ this(clusterName, brokerName, brokerAddress, 0);
}
- public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset) {
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int electionPriority) {
+ this(clusterName, brokerName, brokerAddress, 0, 0, electionPriority);
+ }
+
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset, int electionPriority) {
this.clusterName = clusterName;
this.brokerName = brokerName;
this.brokerAddress = brokerAddress;
this.epoch = epoch;
this.maxOffset = maxOffset;
+ this.electionPriority = electionPriority;
}
public String getClusterName() {
@@ -81,6 +85,14 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
}
+ public Integer getElectionPriority() {
+ return electionPriority;
+ }
+
+ public void setElectionPriority(Integer electionPriority) {
+ this.electionPriority = electionPriority;
+ }
+
@Override
public String toString() {
return "RegisterBrokerToControllerRequestHeader{" +
@@ -90,6 +102,7 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
", epoch=" + epoch +
", maxOffset=" + maxOffset +
", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", electionPriority=" + electionPriority +
'}';
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index ec99ec89b..7984bb8c3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -116,15 +116,14 @@ public class ResetOffsetByTimeOldCommand implements SubCommand {
System.out.printf("specified timestamp invalid.%n");
return;
}
+ }
- boolean force = true;
- if (commandLine.hasOption('f')) {
- force = Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
- }
-
- defaultMQAdminExt.start();
- resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr);
+ boolean force = true;
+ if (commandLine.hasOption('f')) {
+ force = Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
}
+ defaultMQAdminExt.start();
+ resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);