You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/05/01 06:36:13 UTC
[rocketmq] branch develop updated: [ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7903c7041 [ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
7903c7041 is described below
commit 7903c70412fc4d20baafd7b75ca93788551fb88f
Author: Ji Juntao <ju...@alibaba-inc.com>
AuthorDate: Mon May 1 14:35:52 2023 +0800
[ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
* When compute confirmOffset, judge whether the slaves in syncStateSet all connect to the master.
* fix the brokerId to brokerControllerId.
* optimize the code: 1. Set brokerId when the replicasManager inited. 2. Rename the confusing name.
* set haService's brokerControllerId after registered.
* set the haService's brokerControllerId when the ReplicasManager start without brokerIdentity file.
---
.../broker/controller/ReplicasManager.java | 2 ++
.../store/ha/autoswitch/AutoSwitchHAService.java | 29 +++++++++++++++++++---
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index c39e33ad1..3c7e061a2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -549,6 +549,7 @@ public class ReplicasManager {
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
this.tempBrokerMetadata.clear();
this.brokerControllerId = this.brokerMetadata.getBrokerId();
+ this.haService.setBrokerControllerId(this.brokerControllerId);
return true;
} catch (Exception e) {
LOGGER.error("fail to create metadata file", e);
@@ -600,6 +601,7 @@ public class ReplicasManager {
if (this.brokerMetadata.isLoaded()) {
this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
this.brokerControllerId = brokerMetadata.getBrokerId();
+ this.haService.setBrokerControllerId(this.brokerControllerId);
return;
}
// 2. check if temp metadata exist
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 80249bc32..d1e623ca7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* SwitchAble ha service, support switch role to master or slave.
@@ -72,6 +74,8 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
+ private Long brokerControllerId = null;
+
public AutoSwitchHAService() {
}
@@ -427,14 +431,25 @@ public class AutoSwitchHAService extends DefaultHAService {
private long computeConfirmOffset() {
final Set<Long> currentSyncStateSet = getSyncStateSet();
- long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ long newConfirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ List<Long> idList = this.connectionList.stream().map(connection -> ((AutoSwitchHAConnection)connection).getSlaveId()).collect(Collectors.toList());
+
+ // To avoid the syncStateSet is not consistent with connectionList.
+ // Fix issue: https://github.com/apache/rocketmq/issues/6662
+ for (Long syncId : currentSyncStateSet) {
+ if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
+ LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
+ return this.confirmOffset;
+ }
+ }
+
for (HAConnection connection : this.connectionList) {
final Long slaveId = ((AutoSwitchHAConnection) connection).getSlaveId();
if (currentSyncStateSet.contains(slaveId)) {
- confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
+ newConfirmOffset = Math.min(newConfirmOffset, connection.getSlaveAckOffset());
}
}
- return confirmOffset;
+ return newConfirmOffset;
}
public void setSyncStateSet(final Set<Long> syncStateSet) {
@@ -545,6 +560,14 @@ public class AutoSwitchHAService extends DefaultHAService {
return this.epochCache.getAllEntries();
}
+ public Long getBrokerControllerId() {
+ return brokerControllerId;
+ }
+
+ public void setBrokerControllerId(Long brokerControllerId) {
+ this.brokerControllerId = brokerControllerId;
+ }
+
class AutoSwitchAcceptSocketService extends AcceptSocketService {
public AutoSwitchAcceptSocketService(final MessageStoreConfig messageStoreConfig) {