You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/13 06:41:52 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Use confirm offset in ReputMessageService (#4449)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new f038cf95d [Summer of code] Use confirm offset in ReputMessageService (#4449)
f038cf95d is described below
commit f038cf95d72700419706861796eddf5c57f6b6b4
Author: hzh0425 <64...@qq.com>
AuthorDate: Mon Jun 13 14:41:28 2022 +0800
[Summer of code] Use confirm offset in ReputMessageService (#4449)
* use confirm offset when controller mode
* fix bug
* record confirmOffset as a variable
* pass AutoSwitchHAService's test, add confirmOffset test.
* re trigger ci
* review
---
.../apache/rocketmq/store/DefaultMessageStore.java | 9 +++
.../store/ha/autoswitch/AutoSwitchHAClient.java | 6 +-
.../ha/autoswitch/AutoSwitchHAConnection.java | 1 +
.../store/ha/autoswitch/AutoSwitchHAService.java | 31 ++++++++-
.../store/ha/autoswitch/AutoSwitchHATest.java | 79 +++++++++++++++++++---
5 files changed, 113 insertions(+), 13 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7c57cf11f..8dc2e9a3a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1393,6 +1393,12 @@ public class DefaultMessageStore implements MessageStore {
@Override
public long getConfirmOffset() {
+ if (this.brokerConfig.isEnableControllerMode()) {
+ long confirmOffset = ((AutoSwitchHAService)this.haService).getConfirmOffset();
+ if (confirmOffset > 0) {
+ return confirmOffset;
+ }
+ }
return this.commitLog.getConfirmOffset();
}
@@ -2408,6 +2414,9 @@ public class DefaultMessageStore implements MessageStore {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
}
+ if (DefaultMessageStore.this.getBrokerConfig().isEnableControllerMode()) {
+ return this.reputFromOffset <= ((AutoSwitchHAService)DefaultMessageStore.this.haService).getConfirmOffset();
+ }
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 997deb7e8..104b50fa0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -189,6 +189,10 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
return this.currentState;
}
+ public long getConfirmOffset() {
+ return confirmOffset;
+ }
+
@Override public void changeCurrentState(HAConnectionState haConnectionState) {
LOGGER.info("change state to {}", haConnectionState);
this.currentState = haConnectionState;
@@ -495,11 +499,11 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset));
}
- AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
if (bodySize > 0) {
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
}
+ AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
if (!reportSlaveMaxOffset()) {
LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 7d689b69a..d7bfff3b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -322,6 +322,7 @@ public class AutoSwitchHAConnection implements HAConnection {
}
byteBufferRead.position(readSocketPos);
maybeExpandInSyncStateSet(slaveMaxOffset);
+ AutoSwitchHAConnection.this.haService.updateConfirmOffsetWhenSlaveAck(AutoSwitchHAConnection.this.slaveAddress);
AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
LOGGER.debug("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
break;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 9fab640f2..c0ced3158 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.GroupTransferService;
import org.apache.rocketmq.store.ha.HAClient;
@@ -52,11 +53,14 @@ public class AutoSwitchHAService extends DefaultHAService {
private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
+ private volatile long confirmOffset;
+
private String localAddress;
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
+
public AutoSwitchHAService() {
}
@@ -115,7 +119,6 @@ public class AutoSwitchHAService extends DefaultHAService {
this.epochCache.appendEntry(newEpochEntry);
this.defaultMessageStore.recoverTopicQueueTable();
-
LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
return true;
}
@@ -220,10 +223,33 @@ public class AutoSwitchHAService extends DefaultHAService {
* Get confirm offset (min slaveAckOffset of all syncStateSet members)
*/
public long getConfirmOffset() {
+ if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) {
+ if (this.syncStateSet.size() == 1) {
+ return this.defaultMessageStore.getMaxPhyOffset();
+ }
+ // First time compute confirmOffset.
+ if (this.confirmOffset <= 0) {
+ this.confirmOffset = computeConfirmOffset();
+ }
+ return this.confirmOffset;
+ } else if (this.haClient != null) {
+ return this.haClient.getConfirmOffset();
+ }
+ return -1;
+ }
+
+ public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
+ if (this.syncStateSet.contains(slaveAddress)) {
+ this.confirmOffset = computeConfirmOffset();
+ }
+ }
+
+ private long computeConfirmOffset() {
final Set<String> currentSyncStateSet = getSyncStateSet();
long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
for (HAConnection connection : this.connectionList) {
- if (currentSyncStateSet.contains(connection.getClientAddress())) {
+ final String slaveAddress = ((AutoSwitchHAConnection)connection).getSlaveAddress();
+ if (currentSyncStateSet.contains(slaveAddress)) {
confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
}
}
@@ -233,6 +259,7 @@ public class AutoSwitchHAService extends DefaultHAService {
public synchronized void setSyncStateSet(final Set<String> syncStateSet) {
this.syncStateSet.clear();
this.syncStateSet.addAll(syncStateSet);
+ this.confirmOffset = computeConfirmOffset();
}
public synchronized Set<String> getSyncStateSet() {
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index dfbc35f81..d4e690811 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -121,6 +122,10 @@ public class AutoSwitchHATest {
messageStore1.start();
messageStore2.start();
messageStore3.start();
+
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
+ ((AutoSwitchHAService) this.messageStore3.getHaService()).setLocalAddress("127.0.0.1:8002");
}
public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Exception {
@@ -154,6 +159,9 @@ public class AutoSwitchHATest {
assertTrue(messageStore2.load());
messageStore1.start();
messageStore2.start();
+
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
+ ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
}
private void changeMasterAndPutMessage(DefaultMessageStore master, MessageStoreConfig masterConfig,
@@ -172,7 +180,7 @@ public class AutoSwitchHATest {
for (int i = 0; i < totalPutMessageNums; i++) {
master.putMessage(buildMessage());
}
- Thread.sleep(200);
+ Thread.sleep(1000);
}
private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
@@ -187,12 +195,61 @@ public class AutoSwitchHATest {
}
}
+ @Test
+ public void testConfirmOffset() throws Exception {
+ init(defaultMappedFileSize, true);
+ // Step1, set syncStateSet, if both broker1 and broker2 are in syncStateSet, the confirmOffset will be computed as the min slaveAckOffset(broker2's ack)
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Arrays.asList("127.0.0.1:8000", "127.0.0.1:8001")));
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+
+ final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
+
+ // Step2, shutdown store2
+ this.messageStore2.shutdown();
+
+ // Put some messages, which should put failed.
+ for (int i = 0; i < 3; i++) {
+ final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage());
+ assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ }
+
+ // The confirmOffset still don't change, because syncStateSet contains broker2, but broker2 shutdown
+ assertEquals(confirmOffset, ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset());
+
+ // Step3, shutdown store1, start store2, change store2 to master, epoch = 2
+ this.messageStore1.shutdown();
+
+ storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
+ messageStore2 = buildMessageStore(storeConfig2, 2L);
+ assertTrue(messageStore2.load());
+ messageStore2.start();
+ messageStore2.getHaService().changeToMaster(2);
+ ((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
+ Thread.sleep(6000);
+
+ // Put message on master
+ for (int i = 0; i < 10; i++) {
+ messageStore2.putMessage(buildMessage());
+ }
+ Thread.sleep(200);
+
+ // Step4, start store1, it should truncate dirty logs and syncLog from store2
+ storeConfig1.setBrokerRole(BrokerRole.SLAVE);
+ messageStore1 = buildMessageStore(storeConfig1, 1L);
+ assertTrue(messageStore1.load());
+ messageStore1.start();
+ messageStore1.getHaService().changeToSlave("", 2, 1L);
+ messageStore1.getHaService().updateHaMasterAddress(this.store2HaAddress);
+ Thread.sleep(6000);
+
+ checkMessage(this.messageStore1, 20, 0);
+ }
+
@Test
public void testAsyncLearnerBrokerRole() throws Exception {
init(defaultMappedFileSize);
- ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
- ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
storeConfig2.setBrokerRole(BrokerRole.SLAVE);
@@ -201,7 +258,6 @@ public class AutoSwitchHATest {
messageStore2.getHaService().changeToSlave("", 1, 2L);
messageStore2.getHaService().updateHaMasterAddress(store1HaAddress);
Thread.sleep(6000);
-
// Put message on master
for (int i = 0; i < 10; i++) {
messageStore1.putMessage(buildMessage());
@@ -219,16 +275,13 @@ public class AutoSwitchHATest {
public void testOptionAllAckInSyncStateSet() throws Exception {
init(defaultMappedFileSize, true);
AtomicReference<Set<String>> syncStateSet = new AtomicReference<>();
- ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
- ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
((AutoSwitchHAService) this.messageStore1.getHaService()).registerSyncStateSetChangedListener((newSyncStateSet) -> {
System.out.println("Get newSyncStateSet:" + newSyncStateSet);
syncStateSet.set(newSyncStateSet);
});
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
- Thread.sleep(1000);
checkMessage(this.messageStore2, 10, 0);
Thread.sleep(1000);
@@ -251,10 +304,12 @@ public class AutoSwitchHATest {
public void testChangeRoleManyTimes() throws Exception {
// Step1, change store1 to master, store2 to follower
init(defaultMappedFileSize);
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
checkMessage(this.messageStore2, 10, 0);
// Step2, change store1 to follower, store2 to master, epoch = 2
+ ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore1, 1, this.storeConfig1, 2, store2HaAddress, 10);
checkMessage(this.messageStore1, 20, 0);
@@ -267,6 +322,8 @@ public class AutoSwitchHATest {
public void testAddBroker() throws Exception {
// Step1: broker1 as leader, broker2 as follower
init(defaultMappedFileSize);
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
+
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
checkMessage(this.messageStore2, 10, 0);
@@ -285,8 +342,10 @@ public class AutoSwitchHATest {
// Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
// Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
checkMessage(this.messageStore2, 10, 0);
+
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
checkMessage(this.messageStore2, 20, 0);
@@ -321,6 +380,7 @@ public class AutoSwitchHATest {
// Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
// Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
checkMessage(this.messageStore2, 10, 0);
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
@@ -348,6 +408,7 @@ public class AutoSwitchHATest {
checkMessage(messageStore3, 10, 10);
// Step5: change broker2 as leader, broker3 as follower
+ ((AutoSwitchHAService) this.messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8001")));
changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore3, 3, this.storeConfig3, 3, this.store2HaAddress, 10);
checkMessage(messageStore3, 20, 10);
@@ -359,19 +420,18 @@ public class AutoSwitchHATest {
checkMessage(messageStore1, 20, 0);
}
-
@Test
public void testAddBrokerAndSyncFromLastFile() throws Exception {
init(1700);
// Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
// Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+ ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
checkMessage(this.messageStore2, 10, 0);
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 2, store1HaAddress, 10);
checkMessage(this.messageStore2, 20, 0);
-
// Step2: restart broker3
messageStore3.shutdown();
messageStore3.destroy();
@@ -388,7 +448,6 @@ public class AutoSwitchHATest {
checkMessage(messageStore3, 10, 10);
}
-
@After
public void destroy() throws Exception {
Thread.sleep(5000L);