You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/11 03:30:23 UTC
[rocketmq] branch develop updated: [ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5dc2e20ef [ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)
5dc2e20ef is described below
commit 5dc2e20efc9866f0f9d4f242d41ad4b4cfc65644
Author: Ji Juntao <ju...@alibaba-inc.com>
AuthorDate: Thu May 11 11:30:16 2023 +0800
[ISSUE #6728] Compute the confirmOffset without considering new connections (#6729)
* 1. When compute the confirmOffset, dismiss the ackOffset of new connections. 2. When compute the confirmOffset, use getConfirmOffsetDirectly() to avoid the endless calling.
* use the calculated slaveAckOffset
* optimize the logic.
---
.../java/org/apache/rocketmq/store/CommitLog.java | 22 +++++++++++++++++++++-
.../apache/rocketmq/store/DefaultMessageStore.java | 8 ++++++++
.../store/ha/autoswitch/AutoSwitchHAService.java | 5 +++--
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ed5e320be..56f19529d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -553,15 +553,35 @@ public class CommitLog implements Swappable {
}
}
+ // Fetch and compute the newest confirmOffset.
+ // Even if it is just inited.
public long getConfirmOffset() {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
return this.defaultMessageStore.getMaxPhyOffset();
}
- // First time compute confirmOffset.
+ // First time it will compute the confirmOffset.
if (this.confirmOffset <= 0) {
setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
+ log.info("Init the confirmOffset to {}.", this.confirmOffset);
+ }
+ }
+ return this.confirmOffset;
+ } else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+ return this.confirmOffset;
+ } else {
+ return getMaxOffset();
+ }
+ }
+
+ // Fetch the original confirmOffset's value.
+ // Without checking and re-computing.
+ public long getConfirmOffsetDirectly() {
+ if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+ if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
+ if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+ return this.defaultMessageStore.getMaxPhyOffset();
}
}
return this.confirmOffset;
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0b1f69ee7..6b0516b04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1589,11 +1589,19 @@ public class DefaultMessageStore implements MessageStore {
}
}
+ // Fetch and compute the newest confirmOffset.
+ // Even if it is just inited.
@Override
public long getConfirmOffset() {
return this.commitLog.getConfirmOffset();
}
+ // Fetch the original confirmOffset's value.
+ // Without checking and re-computing.
+ public long getConfirmOffsetDirectly() {
+ return this.commitLog.getConfirmOffsetDirectly();
+ }
+
@Override
public void setConfirmOffset(long phyOffset) {
this.commitLog.setConfirmOffset(phyOffset);
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 3a918ee8e..6dc734e0c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -421,13 +421,14 @@ public class AutoSwitchHAService extends DefaultHAService {
for (Long syncId : currentSyncStateSet) {
if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
- return this.defaultMessageStore.getConfirmOffset();
+ // Without check and re-compute, return the confirmOffset's value directly.
+ return this.defaultMessageStore.getConfirmOffsetDirectly();
}
}
for (HAConnection connection : this.connectionList) {
final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId();
- if (currentSyncStateSet.contains(slaveId)) {
+ if (currentSyncStateSet.contains(slaveId) && connection.getSlaveAckOffset() > 0) {
newConfirmOffset = Math.min(newConfirmOffset, connection.getSlaveAckOffset());
}
}