You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hz...@apache.org on 2022/11/26 13:42:50 UTC

[rocketmq] branch dledger-controller-snapshot updated: [ISSUE #5585] Upgrade dledger version (#5586)

This is an automated email from the ASF dual-hosted git repository.

hzh0425 pushed a commit to branch dledger-controller-snapshot
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/dledger-controller-snapshot by this push:
     new 1ba356572 [ISSUE #5585] Upgrade dledger version (#5586)
1ba356572 is described below

commit 1ba356572f3e1c573ab3719e0fc3799836e946cd
Author: hzh0425 <64...@qq.com>
AuthorDate: Sat Nov 26 21:42:31 2022 +0800

    [ISSUE #5585] Upgrade dledger version (#5586)
    
    * upgrade dledger version
---
 .../broker/controller/ReplicasManager.java         |  2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  4 +-
 .../processor/AbstractSendMessageProcessor.java    | 57 +++++++++-------------
 .../broker/processor/ReplyMessageProcessor.java    |  4 +-
 .../broker/processor/SendMessageProcessor.java     | 12 ++---
 .../broker/controller/ReplicasManagerTest.java     |  2 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   | 15 ++++++
 .../java/org/apache/rocketmq/common/UtilAll.java   | 36 ++++++++------
 .../controller/BrokerHeartbeatManager.java         |  2 +-
 .../apache/rocketmq/controller/BrokerLiveInfo.java | 16 +++++-
 .../controller/elect/impl/DefaultElectPolicy.java  | 12 +++--
 .../impl/DLedgerControllerStateMachine.java        | 12 +++--
 .../impl/DefaultBrokerHeartbeatManager.java        |  7 ++-
 .../processor/ControllerRequestProcessor.java      |  3 +-
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |  2 +-
 .../impl/manager/ReplicasInfoManagerTest.java      | 41 ++++++++++++----
 pom.xml                                            |  2 +-
 .../RegisterBrokerToControllerRequestHeader.java   | 23 +++++++--
 .../offset/ResetOffsetByTimeOldCommand.java        | 13 +++--
 19 files changed, 168 insertions(+), 97 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 5eceab0bb..a6589d2ea 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -292,7 +292,7 @@ public class ReplicasManager {
         try {
             final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
                 this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
-                this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset());
+                this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
             final String newMasterAddress = registerResponse.getMasterAddress();
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
                 if (StringUtils.equals(newMasterAddress, this.localAddress)) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index f3ffc9293..a6853350e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -1161,9 +1161,9 @@ public class BrokerOuterAPI {
      */
     public RegisterBrokerToControllerResponseHeader registerBrokerToController(
             final String controllerAddress, final String clusterName,
-            final String brokerName, final String address, final int epoch, final long maxOffset) throws Exception {
+            final String brokerName, final String address, final int epoch, final long maxOffset, final int electionPriority) throws Exception {
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset);
+        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset, electionPriority);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
         final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index fe7040863..d87b765b6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -345,20 +345,25 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
-        SendMessageRequestHeader requestHeader) {
+        SendMessageRequestHeader requestHeader, RemotingCommand request) {
         String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
 
-        SendMessageContext traceContext;
-        traceContext = new SendMessageContext();
-        traceContext.setNamespace(namespace);
-        traceContext.setProducerGroup(requestHeader.getProducerGroup());
-        traceContext.setTopic(requestHeader.getTopic());
-        traceContext.setMsgProps(requestHeader.getProperties());
-        traceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-        traceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
-        traceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
-        traceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
-        traceContext.setRequestTimeStamp(System.currentTimeMillis());
+        SendMessageContext sendMessageContext;
+        sendMessageContext = new SendMessageContext();
+        sendMessageContext.setNamespace(namespace);
+        sendMessageContext.setProducerGroup(requestHeader.getProducerGroup());
+        sendMessageContext.setTopic(requestHeader.getTopic());
+        sendMessageContext.setBodyLength(request.getBody().length);
+        sendMessageContext.setMsgProps(requestHeader.getProperties());
+        sendMessageContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        sendMessageContext.setBrokerAddr(this.brokerController.getBrokerAddr());
+        sendMessageContext.setQueueId(requestHeader.getQueueId());
+        sendMessageContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
+        sendMessageContext.setBornTimeStamp(requestHeader.getBornTimestamp());
+        sendMessageContext.setRequestTimeStamp(System.currentTimeMillis());
+
+        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+        sendMessageContext.setCommercialOwner(owner);
 
         Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
@@ -369,14 +374,14 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         if (uniqueKey == null) {
             uniqueKey = "";
         }
-        traceContext.setMsgUniqueKey(uniqueKey);
+        sendMessageContext.setMsgUniqueKey(uniqueKey);
 
         if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
-            traceContext.setMsgType(MessageType.Order_Msg);
+            sendMessageContext.setMsgType(MessageType.Order_Msg);
         } else {
-            traceContext.setMsgType(MessageType.Normal_Msg);
+            sendMessageContext.setMsgType(MessageType.Normal_Msg);
         }
-        return traceContext;
+        return sendMessageContext;
     }
 
     public boolean hasSendMessageHook() {
@@ -532,29 +537,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
     }
 
-    public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
-        SendMessageContext context) {
+    public void executeSendMessageHookBefore(SendMessageContext context) {
         if (hasSendMessageHook()) {
             for (SendMessageHook hook : this.sendMessageHookList) {
                 try {
-                    final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
-
-                    String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
-                    if (null != requestHeader) {
-                        context.setNamespace(namespace);
-                        context.setProducerGroup(requestHeader.getProducerGroup());
-                        context.setTopic(requestHeader.getTopic());
-                        context.setBodyLength(request.getBody().length);
-                        context.setMsgProps(requestHeader.getProperties());
-                        context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-                        context.setBrokerAddr(this.brokerController.getBrokerAddr());
-                        context.setQueueId(requestHeader.getQueueId());
-                    }
-
                     hook.sendMessageBefore(context);
-                    if (requestHeader != null) {
-                        requestHeader.setProperties(context.getMsgProps());
-                    }
                 } catch (AbortProcessException e) {
                     throw e;
                 } catch (Throwable e) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 35468ab31..dbc87a870 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -70,8 +70,8 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
             return null;
         }
 
-        mqtraceContext = buildMsgContext(ctx, requestHeader);
-        this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+        mqtraceContext = buildMsgContext(ctx, requestHeader, request);
+        this.executeSendMessageHookBefore(mqtraceContext);
 
         RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3bf1f31a2..14095f9ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -81,7 +81,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
-        SendMessageContext traceContext;
+        SendMessageContext sendMessageContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
                 return this.consumerSendMsgBack(ctx, request);
@@ -95,11 +95,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 if (rewriteResult != null) {
                     return rewriteResult;
                 }
-                traceContext = buildMsgContext(ctx, requestHeader);
-                String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-                traceContext.setCommercialOwner(owner);
+                sendMessageContext = buildMsgContext(ctx, requestHeader, request);
                 try {
-                    this.executeSendMessageHookBefore(ctx, request, traceContext);
+                    this.executeSendMessageHookBefore(sendMessageContext);
                 } catch (AbortProcessException e) {
                     final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                     errorResponse.setOpaque(request.getOpaque());
@@ -108,10 +106,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
                 RemotingCommand response;
                 if (requestHeader.isBatch()) {
-                    response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext,
+                    response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                         (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
                 } else {
-                    response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext,
+                    response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                         (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
                 }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 9c08d8e67..84e578db5 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,7 +123,7 @@ public class ReplicasManagerTest {
         when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
         when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
         when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
-        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
         replicasManager = new ReplicasManager(brokerController);
         autoSwitchHAService.init(defaultMessageStore);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 92f28d653..ec5986818 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -336,6 +336,13 @@ public class BrokerConfig extends BrokerIdentity {
 
     private long syncControllerMetadataPeriod = 10 * 1000;
 
+    /**
+     * It is an important basis for the controller to choose the broker master.
+     * The lower the value of brokerElectionPriority, the higher the priority of the broker being selected as the master.
+     * You can set a lower priority for the broker with better machine conditions.
+     */
+    private int brokerElectionPriority = Integer.MAX_VALUE;
+
     public enum MetricsExporterType {
         DISABLE(0),
         OTLP_GRPC(1),
@@ -1422,6 +1429,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.syncControllerMetadataPeriod = syncControllerMetadataPeriod;
     }
 
+    public int getBrokerElectionPriority() {
+        return brokerElectionPriority;
+    }
+
+    public void setBrokerElectionPriority(int brokerElectionPriority) {
+        this.brokerElectionPriority = brokerElectionPriority;
+    }
+
     public boolean isRecoverConcurrently() {
         return recoverConcurrently;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 451a6b849..8fcab1ddf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -40,6 +40,7 @@ import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.zip.CRC32;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
@@ -56,19 +57,28 @@ public class UtilAll {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     private static final Logger STORE_LOG = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-
     public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
     public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
     public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
-    final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
-    final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName(); // format: "pid@hostname"
+    private final static char[] HEX_ARRAY;
+    private final static int PID;
+
+    static {
+        HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+        Supplier<Integer> supplier = () -> {
+            // format: "pid@hostname"
+            String currentJVM = ManagementFactory.getRuntimeMXBean().getName();
+            try {
+                return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));
+            } catch (Exception e) {
+                return -1;
+            }
+        };
+        PID = supplier.get();
+    }
 
     public static int getPid() {
-        try {
-            return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
-        } catch (Exception e) {
-            return -1;
-        }
+        return PID;
     }
 
     public static void sleep(long sleepMs) {
@@ -213,7 +223,7 @@ public class UtilAll {
             File file = new File(path);
             if (!file.exists())
                 return -1;
-            return  file.getTotalSpace();
+            return file.getTotalSpace();
         } catch (Exception e) {
             return -1;
         }
@@ -230,7 +240,6 @@ public class UtilAll {
             return -1;
         }
 
-
         try {
             File file = new File(path);
 
@@ -268,12 +277,11 @@ public class UtilAll {
         try {
             File file = new File(path);
 
-
             if (!file.exists()) {
                 return -1;
             }
 
-            return file.getTotalSpace() -  file.getFreeSpace() + file.getUsableSpace();
+            return file.getTotalSpace() - file.getFreeSpace() + file.getUsableSpace();
         } catch (Exception e) {
             return -1;
         }
@@ -565,7 +573,7 @@ public class UtilAll {
             return null;
         }
         return new StringBuilder().append(ip[0] & 0xFF).append(".").append(
-            ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
+                ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
             .append(".").append(ip[3] & 0xFF).toString();
     }
 
@@ -752,7 +760,7 @@ public class UtilAll {
         }
     }
 
-    private static void  createDirIfNotExist(String dirName) {
+    private static void createDirIfNotExist(String dirName) {
         File f = new File(dirName);
         if (!f.exists()) {
             boolean result = f.mkdirs();
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 364b32647..fd41aa21a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -49,7 +49,7 @@ public interface BrokerHeartbeatManager {
      * Register new broker to heartManager.
      */
     void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
-                        final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset);
+                        final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);
 
     /**
      * Broker channel close
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index e88b26c39..faaf298d2 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -31,9 +31,10 @@ public class BrokerLiveInfo {
     private int epoch;
     private long maxOffset;
     private long confirmOffset;
+    private Integer electionPriority;
 
     public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
-                          Channel channel, int epoch, long maxOffset) {
+                          Channel channel, int epoch, long maxOffset, Integer electionPriority) {
         this.brokerName = brokerName;
         this.brokerAddr = brokerAddr;
         this.brokerId = brokerId;
@@ -41,10 +42,12 @@ public class BrokerLiveInfo {
         this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
         this.channel = channel;
         this.epoch = epoch;
+        this.electionPriority = electionPriority;
         this.maxOffset = maxOffset;
     }
+
     public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
-                          Channel channel, int epoch, long maxOffset, long confirmOffset) {
+                          Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
         this.brokerName = brokerName;
         this.brokerAddr = brokerAddr;
         this.brokerId = brokerId;
@@ -53,6 +56,7 @@ public class BrokerLiveInfo {
         this.channel = channel;
         this.epoch = epoch;
         this.maxOffset = maxOffset;
+        this.electionPriority = electionPriority;
         this.confirmOffset = confirmOffset;
     }
 
@@ -123,6 +127,14 @@ public class BrokerLiveInfo {
         this.confirmOffset = confirmOffset;
     }
 
+    public void setElectionPriority(Integer electionPriority) {
+        this.electionPriority = electionPriority;
+    }
+
+    public Integer getElectionPriority() {
+        return electionPriority;
+    }
+
     public long getConfirmOffset() {
         return confirmOffset;
     }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
index 7af029b98..00cac1627 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java
@@ -36,8 +36,14 @@ public class DefaultElectPolicy implements ElectPolicy {
     // <clusterName, brokerAddr, BrokerLiveInfo>, Used to obtain the BrokerLiveInfo information of a broker
     private BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter;
 
-    private final Comparator<BrokerLiveInfo> comparator = (x, y) -> {
-        return x.getEpoch() == y.getEpoch() ? (int) (y.getMaxOffset() - x.getMaxOffset()) : y.getEpoch() - x.getEpoch();
+    // Sort in descending order according to<epoch, offset>, and sort in ascending order according to priority
+    private final Comparator<BrokerLiveInfo> comparator = (o1, o2) -> {
+        if (o1.getEpoch() == o2.getEpoch()) {
+            return o1.getMaxOffset() == o2.getMaxOffset() ? o1.getElectionPriority() - o2.getElectionPriority() :
+                    (int) (o2.getMaxOffset() - o1.getMaxOffset());
+        } else {
+            return o2.getEpoch() - o1.getEpoch();
+        }
     };
 
     public DefaultElectPolicy(BiPredicate<String, String> validPredicate, BiFunction<String, String, BrokerLiveInfo> additionalInfoGetter) {
@@ -55,7 +61,7 @@ public class DefaultElectPolicy implements ElectPolicy {
      *    - Filter alive brokers by 'validPredicate'.
      *    - Check whether the old master is still valid.
      *    - If preferBrokerAddr is not empty and valid, select it as master.
-     *    - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset), and select the best candidate as the new master.
+     *    - Otherwise, we will sort the array of 'brokerLiveInfo' according to (epoch, offset, electionPriority), and select the best candidate as the new master.
      *
      * @param clusterName       the brokerGroup belongs
      * @param syncStateBrokers  all broker replicas in syncStateSet
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
index dde94e998..4f1408b37 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
@@ -17,11 +17,11 @@
 package org.apache.rocketmq.controller.impl;
 
 import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.exception.DLedgerException;
 import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
 import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
 import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
 import io.openmessaging.storage.dledger.statemachine.StateMachine;
-import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventSerializer;
@@ -30,7 +30,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 /**
- * The state machine implementation of the dledger controller
+ * The state machine implementation of the DLedger controller
  */
 public class DLedgerControllerStateMachine implements StateMachine {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -61,7 +61,8 @@ public class DLedgerControllerStateMachine implements StateMachine {
     }
 
     @Override
-    public void onSnapshotSave(SnapshotWriter writer, CompletableFuture<Boolean> future) {
+    public boolean onSnapshotSave(SnapshotWriter writer) {
+        return false;
     }
 
     @Override
@@ -74,6 +75,11 @@ public class DLedgerControllerStateMachine implements StateMachine {
     public void onShutdown() {
     }
 
+    @Override
+    public void onError(DLedgerException e) {
+
+    }
+
     @Override
     public String getBindDLedgerId() {
         return dLedgerId;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 94a43fa26..eabae152b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -101,7 +101,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
 
     @Override
     public void registerBroker(String clusterName, String brokerName, String brokerAddr,
-        long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset) {
+        long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, final Integer electionPriority) {
         final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
         final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
             new BrokerLiveInfo(brokerName,
@@ -109,7 +109,10 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
                 brokerId,
                 System.currentTimeMillis(),
                 timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
-                channel, epoch == null ? -1 : epoch, maxOffset == null ? -1 : maxOffset));
+                channel,
+                epoch == null ? -1 : epoch,
+                maxOffset == null ? -1 : maxOffset,
+                electionPriority == null ? Integer.MAX_VALUE : electionPriority));
         if (prevBrokerLiveInfo == null) {
             log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
         }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 04caec015..4cbc1140e 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -114,7 +114,8 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                     final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
                     if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
                         this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(), controllerRequest.getEpoch(), controllerRequest.getMaxOffset());
+                                                             responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+                                                             controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getElectionPriority());
                     }
                     return response;
                 }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index b8a151ea0..dd0c60a65 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -43,7 +43,7 @@ public class DefaultBrokerHeartbeatManagerTest {
         this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
             latch.countDown();
         });
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L, 0);
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
         this.heartbeatManager.shutdown();
     }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 0b7faafc0..270f98089 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -126,29 +126,38 @@ public class ReplicasInfoManagerTest {
 
     public void mockHeartbeatDataMasterStillAlive() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
-            1, 3L);
+            1, 3L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L);
+            1, 2L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            1, 3L);
+            1, 3L, 0);
     }
 
     public void mockHeartbeatDataHigherEpoch() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
-            1, 3L);
+            1, 3L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L);
+            1, 2L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            0, 3L);
+            0, 3L, 0);
     }
 
     public void mockHeartbeatDataHigherOffset() {
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
-            1, 3L);
+            1, 3L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L);
+            1, 2L, 0);
         this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            1, 3L);
+            1, 3L, 0);
+    }
+
+    public void mockHeartbeatDataHigherPriority() {
+        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+                1, 3L, 3);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+                1, 3L, 2);
+        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+                1, 3L, 1);
     }
 
     @Test
@@ -190,6 +199,20 @@ public class ReplicasInfoManagerTest {
         assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
     }
 
+    @Test
+    public void testElectMasterPreferHigherPriorityWhenEpochAndOffsetEquals() {
+        mockMetaData();
+        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
+        mockHeartbeatDataHigherPriority();
+        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
+                electPolicy);
+        final ElectMasterResponseHeader response = cResult.getResponse();
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+    }
+
     @Test
     public void testElectMaster() {
         mockMetaData();
diff --git a/pom.xml b/pom.xml
index 197cea783..cff2d293c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
         <lz4-java.version>1.8.0</lz4-java.version>
         <opentracing.version>0.33.0</opentracing.version>
         <jaeger.version>1.6.0</jaeger.version>
-        <dleger.version>0.3.1</dleger.version>
+        <dleger.version>0.3.2</dleger.version>
         <annotations-api.version>6.0.53</annotations-api.version>
         <extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
         <concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
index 81ed03d48..a8e745e4f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
@@ -30,23 +30,27 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
     private Long maxOffset;
     @CFNullable
     private Long heartbeatTimeoutMillis;
-
+    @CFNullable
+    private Integer electionPriority;
 
     public RegisterBrokerToControllerRequestHeader() {
     }
 
     public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress) {
-        this.clusterName = clusterName;
-        this.brokerName = brokerName;
-        this.brokerAddress = brokerAddress;
+        this(clusterName, brokerName, brokerAddress, 0);
     }
 
-    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset) {
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int electionPriority) {
+        this(clusterName, brokerName, brokerAddress, 0, 0, electionPriority);
+    }
+
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset, int electionPriority) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.brokerAddress = brokerAddress;
         this.epoch = epoch;
         this.maxOffset = maxOffset;
+        this.electionPriority = electionPriority;
     }
 
     public String getClusterName() {
@@ -81,6 +85,14 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
         this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
     }
 
+    public Integer getElectionPriority() {
+        return electionPriority;
+    }
+
+    public void setElectionPriority(Integer electionPriority) {
+        this.electionPriority = electionPriority;
+    }
+
     @Override
     public String toString() {
         return "RegisterBrokerToControllerRequestHeader{" +
@@ -90,6 +102,7 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
                 ", epoch=" + epoch +
                 ", maxOffset=" + maxOffset +
                 ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+                ", electionPriority=" + electionPriority +
                 '}';
     }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index ec99ec89b..7984bb8c3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -116,15 +116,14 @@ public class ResetOffsetByTimeOldCommand implements SubCommand {
                     System.out.printf("specified timestamp invalid.%n");
                     return;
                 }
+            }
 
-                boolean force = true;
-                if (commandLine.hasOption('f')) {
-                    force = Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
-                }
-
-                defaultMQAdminExt.start();
-                resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr);
+            boolean force = true;
+            if (commandLine.hasOption('f')) {
+                force = Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
             }
+            defaultMQAdminExt.start();
+            resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);