You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/16 03:58:17 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 3fbcad5f9 Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)
3fbcad5f9 is described below

commit 3fbcad5f91fe08c51283d1b065c9b352821e2880
Author: rongtong <ji...@163.com>
AuthorDate: Thu Jun 16 11:58:11 2022 +0800

    Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)
    
    * Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead
    
    * Pass the UT
    
    * Format the code style
    
    * Do the same for batch messages and polish the code
    
    * Polish the constant name
    
    * Polish the document
    
    * Polish the document
---
 .../java/org/apache/rocketmq/common/MixAll.java    |  1 +
 docs/cn/controller/design.md                       | 11 ++---
 docs/cn/controller/quick_start.md                  |  6 +--
 .../java/org/apache/rocketmq/store/CommitLog.java  | 54 +++++++++++-----------
 .../rocketmq/store/ha/GroupTransferService.java    | 12 +++--
 .../org/apache/rocketmq/store/ha/HAServerTest.java | 11 ++---
 6 files changed, 46 insertions(+), 49 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 638d03806..1885a3cf3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -77,6 +77,7 @@ public class MixAll {
     public static final long FIRST_SLAVE_ID = 1L;
     public static final long CURRENT_JVM_PID = getPID();
     public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
+    public final static int ALL_ACK_IN_SYNC_STATE_SET = -1;
 
     public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
     public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
diff --git a/docs/cn/controller/design.md b/docs/cn/controller/design.md
index 0080986e1..e100881fd 100644
--- a/docs/cn/controller/design.md
+++ b/docs/cn/controller/design.md
@@ -2,9 +2,8 @@
 
 当前 RocketMQ Raft 模式主要是利用 DLedger Commitlog 替换原来的 Commitlog,使 Commitlog 拥有选举复制能力,但这也造成了一些问题:
 
-- Raft 模式下,Broker组内副本数必须是三副本及以上。
+- Raft 模式下,Broker组内副本数必须是三副本及以上,副本的ACK也必须遵循多数派协议。
 - RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
-- Raft 模式下, 日志复制性能并不高效。
 
 因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件, 支持独立部署, 也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举, 从而解决上述问题, 我们将该新模式称为 Controller 模式。
 
@@ -27,7 +26,7 @@
 
 ![image-20220605213143645](../image/controller/quick-start/controller.png)
 
-如果, 是 DledgerController 的核心设计:
+如图是 DledgerController 的核心设计:
 
 - DLedgerController 可以内嵌在 Namesrv 中, 也可以独立的部署。
 - Active DLedgerController 是 DLedger 选举出来的 Leader, 其会接受来自客户端的事件请求, 并通过 DLedger 发起共识, 最后应用到内存元数据状态机中。
@@ -195,11 +194,9 @@ Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距
 
 - ReadSocketService 接收到 slaveAckOffset 时若 slaveAckOffset >= lastMasterMaxOffset 则将lastCaughtUpTimeMs 更新为 lastTransferTimeMs。
 
-- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) >
+- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的。
 
-  haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的 。
-
-- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet 。
+- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet。
 
 #### Expand
 
diff --git a/docs/cn/controller/quick_start.md b/docs/cn/controller/quick_start.md
index c125b48ab..b3c86d75d 100644
--- a/docs/cn/controller/quick_start.md
+++ b/docs/cn/controller/quick_start.md
@@ -2,7 +2,7 @@
 
 ## 前言
 
-该文档主要介绍如何快速构建和部署基于 Controller 的可以自动容灾切换的 RocketMQ 集群。
+该文档主要介绍如何快速构建和部署基于 Controller 的可以自动切换的 RocketMQ 集群。
 
 详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)。
 
@@ -26,7 +26,7 @@
 
 如果上面的步骤执行成功,可以通过运维命令查看集群状态。
 
-至此, 启动成功,现在可以向集群收发消息,并进行容灾切换测试了。
+至此, 启动成功,现在可以向集群收发消息,并进行切换测试了。
 
 如果需要关闭快速集群,可以执行:
 
@@ -58,7 +58,7 @@
 
 ![image-20220605205247476](../image/controller/quick-start/epoch.png)
 
-## 容灾切换
+## 切换
 
 部署成功后,现在尝试进行 Master 切换。
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b95804bb3..75213b83a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -349,7 +349,6 @@ public class CommitLog implements Swappable {
                 this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
             }
 
-
         } else {
             // Commitlog case files are deleted
             log.warn("The commitlog files are deleted, and delete the consume queue files");
@@ -803,13 +802,18 @@ public class CommitLog implements Swappable {
         boolean needHandleHA = needHandleHA(msg);
         int needAckNums = 1;
 
-        if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
-            int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
-                this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
-            needAckNums = calcNeedAckNums(inSyncReplicas);
-            if (needAckNums > inSyncReplicas) {
-                // Tell the producer, don't have enough slaves to handle the send request
-                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+        if (needHandleHA) {
+            if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
+                // -1 means all ack in SyncStateSet
+                needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
+            } else {
+                int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+                    this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+                needAckNums = calcNeedAckNums(inSyncReplicas);
+                if (needAckNums > inSyncReplicas) {
+                    // Tell the producer, don't have enough slaves to handle the send request
+                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+                }
             }
         }
 
@@ -950,13 +954,18 @@ public class CommitLog implements Swappable {
         int needAckNums = 1;
         boolean needHandleHA = needHandleHA(messageExtBatch);
 
-        if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
-            int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
-                this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
-            needAckNums = calcNeedAckNums(inSyncReplicas);
-            if (needAckNums > inSyncReplicas) {
-                // Tell the producer, don't have enough slaves to handle the send request
-                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+        if (needHandleHA) {
+            if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
+                // -1 means all ack in SyncStateSet
+                needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
+            } else {
+                int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+                    this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+                needAckNums = calcNeedAckNums(inSyncReplicas);
+                if (needAckNums > inSyncReplicas) {
+                    // Tell the producer, don't have enough slaves to handle the send request
+                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+                }
             }
         }
 
@@ -1101,8 +1110,7 @@ public class CommitLog implements Swappable {
 
     private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
         int needAckNums) {
-        final boolean allAckInSyncStateSet = this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet();
-        if (needAckNums <= 1 && !allAckInSyncStateSet) {
+        if (needAckNums >= 0 && needAckNums <= 1) {
             return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
         }
 
@@ -1122,7 +1130,7 @@ public class CommitLog implements Swappable {
 //        }
 
         // Wait enough acks from different slaves
-        GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums - 1, allAckInSyncStateSet);
+        GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
         haService.putRequest(request);
         haService.getWaitNotifyObject().wakeupAll();
         return request.future();
@@ -1389,7 +1397,6 @@ public class CommitLog implements Swappable {
         private final CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
         private volatile int ackNums = 1;
         private final long deadLine;
-        private boolean allAckInSyncStateSet;
 
         public GroupCommitRequest(long nextOffset, long timeoutMillis) {
             this.nextOffset = nextOffset;
@@ -1401,11 +1408,6 @@ public class CommitLog implements Swappable {
             this.ackNums = ackNums;
         }
 
-        public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums, boolean allAckInSyncStateSet) {
-            this(nextOffset, timeoutMillis, ackNums);
-            this.allAckInSyncStateSet = allAckInSyncStateSet;
-        }
-
         public long getNextOffset() {
             return nextOffset;
         }
@@ -1418,10 +1420,6 @@ public class CommitLog implements Swappable {
             return deadLine;
         }
 
-        public boolean isAllAckInSyncStateSet() {
-            return allAckInSyncStateSet;
-        }
-
         public void wakeupCustomer(final PutMessageStatus status) {
             this.flushOKFuture.complete(status);
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
index 02198ad05..f84bdbf05 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store.ha;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -74,14 +75,14 @@ public class GroupTransferService extends ServiceThread {
                     boolean transferOK = false;
 
                     long deadLine = req.getDeadLine();
-                    final boolean allAckInSyncStateSet = req.isAllAckInSyncStateSet();
+                    final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;
 
                     for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) {
                         if (i > 0) {
                             this.notifyTransferObject.waitForRunning(1000);
                         }
 
-                        if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
+                        if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
                             transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
                             continue;
                         }
@@ -95,12 +96,12 @@ public class GroupTransferService extends ServiceThread {
                                 transferOK = true;
                                 break;
                             }
-                            // Include master.
+                            // Include master
                             int ackNums = 1;
                             for (HAConnection conn : haService.getConnectionList()) {
                                 final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
                                 if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
-                                    ackNums ++;
+                                    ackNums++;
                                 }
                                 if (ackNums >= syncStateSet.size()) {
                                     transferOK = true;
@@ -108,7 +109,8 @@ public class GroupTransferService extends ServiceThread {
                                 }
                             }
                         } else {
-                            int ackNums = 0;
+                            // Include master
+                            int ackNums = 1;
                             for (HAConnection conn : haService.getConnectionList()) {
                                 // TODO: We must ensure every HAConnection represents a different slave
                                 // Solution: Consider assign a unique and fixed IP:ADDR for each different slave
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
index e55fedc97..5304bec46 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
@@ -176,7 +176,7 @@ public class HAServerTest {
 
     @Test
     public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException {
-        CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000,1);
+        CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1);
         this.haService.putRequest(request);
 
         assertThat(request.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
@@ -186,17 +186,17 @@ public class HAServerTest {
         doReturn(124L).when(messageStore).getMasterFlushedOffset();
         setUpOneHAClient(messageStore);
 
-        request = new CommitLog.GroupCommitRequest(124, 4000,1);
+        request = new CommitLog.GroupCommitRequest(124, 4000, 1);
         this.haService.putRequest(request);
         assertThat(request.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
     }
 
     @Test
     public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException {
-        CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000,1);
+        CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2);
         this.haService.putRequest(oneAck);
 
-        CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124,4000, 2);
+        CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
         this.haService.putRequest(twoAck);
 
         DefaultMessageStore messageStore = mockMessageStore();
@@ -207,13 +207,12 @@ public class HAServerTest {
         assertThat(oneAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
         assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
 
-
         messageStore = mockMessageStore();
         doReturn(128L).when(messageStore).getMaxPhyOffset();
         doReturn(128L).when(messageStore).getMasterFlushedOffset();
         setUpOneHAClient(messageStore);
 
-        twoAck = new CommitLog.GroupCommitRequest(124, 4000,2);
+        twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
         this.haService.putRequest(twoAck);
         assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
     }