You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/16 03:58:17 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 3fbcad5f9 Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)
3fbcad5f9 is described below
commit 3fbcad5f91fe08c51283d1b065c9b352821e2880
Author: rongtong <ji...@163.com>
AuthorDate: Thu Jun 16 11:58:11 2022 +0800
Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead (#4467)
* Remove allAckInSyncStateSet in GroupCommitRequest and use ackNums = -1 instead
* Pass the UT
* Format the code style
* Do the same for batch messages and polish the code
* Polish the constant name
* Polish the document
* Polish the document
---
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
docs/cn/controller/design.md | 11 ++---
docs/cn/controller/quick_start.md | 6 +--
.../java/org/apache/rocketmq/store/CommitLog.java | 54 +++++++++++-----------
.../rocketmq/store/ha/GroupTransferService.java | 12 +++--
.../org/apache/rocketmq/store/ha/HAServerTest.java | 11 ++---
6 files changed, 46 insertions(+), 49 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 638d03806..1885a3cf3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -77,6 +77,7 @@ public class MixAll {
public static final long FIRST_SLAVE_ID = 1L;
public static final long CURRENT_JVM_PID = getPID();
public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
+ public final static int ALL_ACK_IN_SYNC_STATE_SET = -1;
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
diff --git a/docs/cn/controller/design.md b/docs/cn/controller/design.md
index 0080986e1..e100881fd 100644
--- a/docs/cn/controller/design.md
+++ b/docs/cn/controller/design.md
@@ -2,9 +2,8 @@
当前 RocketMQ Raft 模式主要是利用 DLedger Commitlog 替换原来的 Commitlog,使 Commitlog 拥有选举复制能力,但这也造成了一些问题:
-- Raft 模式下,Broker组内副本数必须是三副本及以上。
+- Raft 模式下,Broker组内副本数必须是三副本及以上,副本的ACK也必须遵循多数派协议。
- RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
-- Raft 模式下, 日志复制性能并不高效。
因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件, 支持独立部署, 也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举, 从而解决上述问题, 我们将该新模式称为 Controller 模式。
@@ -27,7 +26,7 @@
![image-20220605213143645](../image/controller/quick-start/controller.png)
-如果, 是 DledgerController 的核心设计:
+如图是 DledgerController 的核心设计:
- DLedgerController 可以内嵌在 Namesrv 中, 也可以独立的部署。
- Active DLedgerController 是 DLedger 选举出来的 Leader, 其会接受来自客户端的事件请求, 并通过 DLedger 发起共识, 最后应用到内存元数据状态机中。
@@ -195,11 +194,9 @@ Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距
- ReadSocketService 接收到 slaveAckOffset 时若 slaveAckOffset >= lastMasterMaxOffset 则将lastCaughtUpTimeMs 更新为 lastTransferTimeMs。
-- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) >
+- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time - connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的。
- haMaxTimeSlaveNotCatchUp,则该 Slave 是 Out-of-sync 的 。
-
-- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet 。
+- 如果检测到 Slave out of sync , master 会立刻和 Controller 上报, 从而 Shrink SyncStateSet。
#### Expand
diff --git a/docs/cn/controller/quick_start.md b/docs/cn/controller/quick_start.md
index c125b48ab..b3c86d75d 100644
--- a/docs/cn/controller/quick_start.md
+++ b/docs/cn/controller/quick_start.md
@@ -2,7 +2,7 @@
## 前言
-该文档主要介绍如何快速构建和部署基于 Controller 的可以自动容灾切换的 RocketMQ 集群。
+该文档主要介绍如何快速构建和部署基于 Controller 的可以自动切换的 RocketMQ 集群。
详细的新集群部署和旧集群升级指南请参考 [部署指南](deploy_guide.md)。
@@ -26,7 +26,7 @@
如果上面的步骤执行成功,可以通过运维命令查看集群状态。
-至此, 启动成功,现在可以向集群收发消息,并进行容灾切换测试了。
+至此, 启动成功,现在可以向集群收发消息,并进行切换测试了。
如果需要关闭快速集群,可以执行:
@@ -58,7 +58,7 @@
![image-20220605205247476](../image/controller/quick-start/epoch.png)
-## 容灾切换
+## 切换
部署成功后,现在尝试进行 Master 切换。
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b95804bb3..75213b83a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -349,7 +349,6 @@ public class CommitLog implements Swappable {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
-
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
@@ -803,13 +802,18 @@ public class CommitLog implements Swappable {
boolean needHandleHA = needHandleHA(msg);
int needAckNums = 1;
- if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
- int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
- this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
- needAckNums = calcNeedAckNums(inSyncReplicas);
- if (needAckNums > inSyncReplicas) {
- // Tell the producer, don't have enough slaves to handle the send request
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+ if (needHandleHA) {
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
+ // -1 means all ack in SyncStateSet
+ needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
+ } else {
+ int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+ this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+ needAckNums = calcNeedAckNums(inSyncReplicas);
+ if (needAckNums > inSyncReplicas) {
+ // Tell the producer, don't have enough slaves to handle the send request
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+ }
}
}
@@ -950,13 +954,18 @@ public class CommitLog implements Swappable {
int needAckNums = 1;
boolean needHandleHA = needHandleHA(messageExtBatch);
- if (needHandleHA && !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
- int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
- this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
- needAckNums = calcNeedAckNums(inSyncReplicas);
- if (needAckNums > inSyncReplicas) {
- // Tell the producer, don't have enough slaves to handle the send request
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+ if (needHandleHA) {
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode() && this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
+ // -1 means all ack in SyncStateSet
+ needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
+ } else {
+ int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
+ this.defaultMessageStore.getHaService().inSyncSlaveNums(currOffset) + 1);
+ needAckNums = calcNeedAckNums(inSyncReplicas);
+ if (needAckNums > inSyncReplicas) {
+ // Tell the producer, don't have enough slaves to handle the send request
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
+ }
}
}
@@ -1101,8 +1110,7 @@ public class CommitLog implements Swappable {
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
int needAckNums) {
- final boolean allAckInSyncStateSet = this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet();
- if (needAckNums <= 1 && !allAckInSyncStateSet) {
+ if (needAckNums >= 0 && needAckNums <= 1) {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
@@ -1122,7 +1130,7 @@ public class CommitLog implements Swappable {
// }
// Wait enough acks from different slaves
- GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums - 1, allAckInSyncStateSet);
+ GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
haService.putRequest(request);
haService.getWaitNotifyObject().wakeupAll();
return request.future();
@@ -1389,7 +1397,6 @@ public class CommitLog implements Swappable {
private final CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private volatile int ackNums = 1;
private final long deadLine;
- private boolean allAckInSyncStateSet;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
@@ -1401,11 +1408,6 @@ public class CommitLog implements Swappable {
this.ackNums = ackNums;
}
- public GroupCommitRequest(long nextOffset, long timeoutMillis, int ackNums, boolean allAckInSyncStateSet) {
- this(nextOffset, timeoutMillis, ackNums);
- this.allAckInSyncStateSet = allAckInSyncStateSet;
- }
-
public long getNextOffset() {
return nextOffset;
}
@@ -1418,10 +1420,6 @@ public class CommitLog implements Swappable {
return deadLine;
}
- public boolean isAllAckInSyncStateSet() {
- return allAckInSyncStateSet;
- }
-
public void wakeupCustomer(final PutMessageStatus status) {
this.flushOKFuture.complete(status);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
index 02198ad05..f84bdbf05 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/GroupTransferService.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store.ha;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -74,14 +75,14 @@ public class GroupTransferService extends ServiceThread {
boolean transferOK = false;
long deadLine = req.getDeadLine();
- final boolean allAckInSyncStateSet = req.isAllAckInSyncStateSet();
+ final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;
for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) {
if (i > 0) {
this.notifyTransferObject.waitForRunning(1000);
}
- if (req.getAckNums() <= 1 && !allAckInSyncStateSet) {
+ if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
continue;
}
@@ -95,12 +96,12 @@ public class GroupTransferService extends ServiceThread {
transferOK = true;
break;
}
- // Include master.
+ // Include master
int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
- ackNums ++;
+ ackNums++;
}
if (ackNums >= syncStateSet.size()) {
transferOK = true;
@@ -108,7 +109,8 @@ public class GroupTransferService extends ServiceThread {
}
}
} else {
- int ackNums = 0;
+ // Include master
+ int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
// TODO: We must ensure every HAConnection represents a different slave
// Solution: Consider assign a unique and fixed IP:ADDR for each different slave
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
index e55fedc97..5304bec46 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java
@@ -176,7 +176,7 @@ public class HAServerTest {
@Test
public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException {
- CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000,1);
+ CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1);
this.haService.putRequest(request);
assertThat(request.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
@@ -186,17 +186,17 @@ public class HAServerTest {
doReturn(124L).when(messageStore).getMasterFlushedOffset();
setUpOneHAClient(messageStore);
- request = new CommitLog.GroupCommitRequest(124, 4000,1);
+ request = new CommitLog.GroupCommitRequest(124, 4000, 1);
this.haService.putRequest(request);
assertThat(request.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
}
@Test
public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException {
- CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000,1);
+ CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2);
this.haService.putRequest(oneAck);
- CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124,4000, 2);
+ CommitLog.GroupCommitRequest twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
this.haService.putRequest(twoAck);
DefaultMessageStore messageStore = mockMessageStore();
@@ -207,13 +207,12 @@ public class HAServerTest {
assertThat(oneAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
-
messageStore = mockMessageStore();
doReturn(128L).when(messageStore).getMaxPhyOffset();
doReturn(128L).when(messageStore).getMasterFlushedOffset();
setUpOneHAClient(messageStore);
- twoAck = new CommitLog.GroupCommitRequest(124, 4000,2);
+ twoAck = new CommitLog.GroupCommitRequest(124, 4000, 3);
this.haService.putRequest(twoAck);
assertThat(twoAck.future().get()).isEqualTo(PutMessageStatus.PUT_OK);
}