You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/06 08:34:49 UTC
[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of Code] Support switch role for ha service (#4236)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
new 911ee340c [Summer of Code] Support switch role for ha service (#4236)
911ee340c is described below
commit 911ee340cb502645cd75603f44ba72c198611434
Author: hzh0425 <58...@users.noreply.github.com>
AuthorDate: Fri May 6 16:34:27 2022 +0800
[Summer of Code] Support switch role for ha service (#4236)
* feature: support auto switch role ha service
* feature:
1.add EpochStartOffset in ha protocal
2.notify AutoSwitchHAService when delete expired files
3.add more tests for AutoSwitchHAService
* feature:
1.transfer syncFromLastFile from slave to master in handshake state
2.return false if find consistent point failed
---
.../org/apache/rocketmq/common/EpochEntry.java | 68 +++
.../rocketmq/common/utils/CheckpointFile.java | 122 ++++
.../namesrv/routeinfo/RouteInfoManager.java | 1 -
.../apache/rocketmq/store/DefaultMessageStore.java | 17 +-
.../rocketmq/store/config/MessageStoreConfig.java | 40 ++
.../apache/rocketmq/store/ha/DefaultHAService.java | 70 ++-
.../org/apache/rocketmq/store/ha/HAService.java | 21 +
.../store/ha/autoswitch/AutoSwitchHAClient.java | 503 ++++++++++++++++
.../ha/autoswitch/AutoSwitchHAConnection.java | 630 +++++++++++++++++++++
.../store/ha/autoswitch/AutoSwitchHAService.java | 215 +++++++
.../store/ha/autoswitch/EpochFileCache.java | 328 +++++++++++
.../rocketmq/store/ha/io/AbstractHAReader.java | 80 +++
.../apache/rocketmq/store/ha/io/HAReadHook.java | 22 +
.../apache/rocketmq/store/ha/io/HAWriteHook.java | 22 +
.../org/apache/rocketmq/store/ha/io/HAWriter.java | 67 +++
.../rocketmq/store/ha/EpochFileCacheTest.java | 150 +++++
.../store/ha/autoswitch/AutoSwitchHATest.java | 332 +++++++++++
17 files changed, 2662 insertions(+), 26 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java
new file mode 100644
index 000000000..6cd3ceb23
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+ private int epoch;
+ private long startOffset;
+ private long endOffset = Long.MAX_VALUE;
+
+ public EpochEntry(EpochEntry entry) {
+ this.epoch = entry.getEpoch();
+ this.startOffset = entry.getStartOffset();
+ this.endOffset = entry.getEndOffset();
+ }
+
+ public EpochEntry(int epoch, long startOffset) {
+ this.epoch = epoch;
+ this.startOffset = startOffset;
+ }
+
+ public int getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(int epoch) {
+ this.epoch = epoch;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ @Override public String toString() {
+ return "EpochEntry{" +
+ "epoch=" + epoch +
+ ", startOffset=" + startOffset +
+ ", endOffset=" + endOffset +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java b/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java
new file mode 100644
index 000000000..e47a2cd5c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Entry Checkpoint file util
+ * Format:
+ * First line: Entries size
+ * Then: every Entry(String)
+ */
+public class CheckpointFile<T> {
+
+ public interface CheckpointSerializer<T> {
+ /**
+ * Serialize entry to line
+ */
+ String toLine(final T entry);
+
+ /**
+ * DeSerialize line to entry
+ */
+ T fromLine(final String line);
+ }
+
+ private final String path;
+ private final CheckpointSerializer<T> serializer;
+
+ public CheckpointFile(final String path, final CheckpointSerializer<T> serializer) {
+ this.path = path;
+ this.serializer = serializer;
+ }
+
+ /**
+ * Write entries to file
+ */
+ public void write(final List<T> entries) throws IOException {
+ if (entries.isEmpty()) {
+ return;
+ }
+ synchronized (this) {
+ final FileOutputStream fos = new FileOutputStream(this.path);
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
+ // Write size
+ writer.write(entries.size() + "");
+ writer.newLine();
+
+ // Write entries
+ for (T entry : entries) {
+ final String line = this.serializer.toLine(entry);
+ if (line != null && !line.isEmpty()) {
+ writer.write(line);
+ writer.newLine();
+ }
+ }
+
+ writer.flush();
+ fos.getFD().sync();
+ }
+ }
+ }
+
+ /**
+ * Read entries from file
+ */
+ public List<T> read() throws IOException {
+ final ArrayList<T> result = new ArrayList<>();
+ synchronized (this) {
+ final File file = new File(this.path);
+ if (!file.exists()) {
+ return result;
+ }
+ final BufferedReader reader = Files.newBufferedReader(file.toPath());
+ try {
+ // Read size
+ int expectedLines = Integer.parseInt(reader.readLine());
+
+ // Read entries
+ String line = reader.readLine();
+ while (line != null) {
+ final T entry = this.serializer.fromLine(line);
+ if (entry != null) {
+ result.add(entry);
+ }
+ line = reader.readLine();
+ }
+ if (result.size() != expectedLines) {
+ final String err = String.format("Expect %d entries, only found %d entries", expectedLines, result.size());
+ throw new IOException(err);
+ }
+ return result;
+ } finally {
+ reader.close();
+ }
+ }
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index dd206bbb4..89b807744 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -692,7 +692,6 @@ public class RouteInfoManager {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
- e.printStackTrace();
log.error("unregisterBroker Exception", e);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dc6cbf591..b3d9e6556 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -77,6 +77,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.hook.PutMessageHook;
import org.apache.rocketmq.store.hook.SendMessageBackHook;
import org.apache.rocketmq.store.index.IndexService;
@@ -189,8 +190,13 @@ public class DefaultMessageStore implements MessageStore {
if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
this.haService = ServiceProvider.loadClass(ServiceProvider.HA_SERVICE_ID, HAService.class);
if (null == this.haService) {
- this.haService = new DefaultHAService();
- LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
+ if (this.messageStoreConfig.isStartupControllerMode()) {
+ this.haService = new AutoSwitchHAService();
+ LOGGER.warn("Load AutoSwitch HA Service: {}", AutoSwitchHAService.class.getSimpleName());
+ } else {
+ this.haService = new DefaultHAService();
+ LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
+ }
}
this.haService.init(this);
}
@@ -1932,6 +1938,13 @@ public class DefaultMessageStore implements MessageStore {
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
if (deleteCount > 0) {
+ // If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
+ if (DefaultMessageStore.this.messageStoreConfig.isStartupControllerMode()) {
+ if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
+ final long minPhyOffset = getMinPhyOffset();
+ ((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
+ }
+ }
} else if (isUsageExceedsThreshold) {
LOGGER.warn("disk space will be full soon, but delete file failed.");
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 1a6d6ae48..99b88d521 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -33,6 +33,11 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
+ //The directory in which the commitlog is kept
+ @ImportantField
+ private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
+ + File.separator + "commitlog" + File.separator + "epochFileCheckpoint";
+
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
@@ -248,6 +253,12 @@ public class MessageStoreConfig {
@ImportantField
private boolean enableAutoInSyncReplicas = false;
+ /**
+ * Whether startup controller mode
+ */
+ @ImportantField
+ private boolean startupControllerMode = false;
+
/**
* Enable or not ha flow control
*/
@@ -285,6 +296,11 @@ public class MessageStoreConfig {
private boolean syncFromMinOffset = false;
+ /**
+ * Whether sync from lastFile when a new broker replicas join the master.
+ */
+ private boolean syncFromLastFile = false;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -459,6 +475,14 @@ public class MessageStoreConfig {
this.storePathCommitLog = storePathCommitLog;
}
+ public String getStorePathEpochFile() {
+ return storePathEpochFile;
+ }
+
+ public void setStorePathEpochFile(String storePathEpochFile) {
+ this.storePathEpochFile = storePathEpochFile;
+ }
+
public String getDeleteWhen() {
return deleteWhen;
}
@@ -1150,6 +1174,14 @@ public class MessageStoreConfig {
this.enableAutoInSyncReplicas = enableAutoInSyncReplicas;
}
+ public boolean isStartupControllerMode() {
+ return startupControllerMode;
+ }
+
+ public void setStartupControllerMode(boolean startupControllerMode) {
+ this.startupControllerMode = startupControllerMode;
+ }
+
public boolean isHaFlowControlEnable() {
return haFlowControlEnable;
}
@@ -1222,6 +1254,14 @@ public class MessageStoreConfig {
this.syncFromMinOffset = syncFromMinOffset;
}
+ public boolean isSyncFromLastFile() {
+ return syncFromLastFile;
+ }
+
+ public void setSyncFromLastFile(boolean syncFromLastFile) {
+ this.syncFromLastFile = syncFromLastFile;
+ }
+
public boolean isEnableLmq() {
return enableLmq;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index 8c480a017..20f7b2046 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -43,22 +43,22 @@ public class DefaultHAService implements HAService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private final AtomicInteger connectionCount = new AtomicInteger(0);
+ protected final AtomicInteger connectionCount = new AtomicInteger(0);
- private final List<HAConnection> connectionList = new LinkedList<>();
+ protected final List<HAConnection> connectionList = new LinkedList<>();
- private AcceptSocketService acceptSocketService;
+ protected AcceptSocketService acceptSocketService;
- private DefaultMessageStore defaultMessageStore;
+ protected DefaultMessageStore defaultMessageStore;
- private WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
- private AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
+ protected WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
+ protected AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
- private GroupTransferService groupTransferService;
+ protected GroupTransferService groupTransferService;
- private DefaultHAClient haClient;
+ protected HAClient haClient;
- private HAConnectionStateNotificationService haConnectionStateNotificationService;
+ protected HAConnectionStateNotificationService haConnectionStateNotificationService;
public DefaultHAService() {
}
@@ -66,7 +66,7 @@ public class DefaultHAService implements HAService {
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
- new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
this.haClient = new DefaultHAClient(this.defaultMessageStore);
@@ -125,13 +125,13 @@ public class DefaultHAService implements HAService {
}
}
- public void addConnection(final DefaultHAConnection conn) {
+ public void addConnection(final HAConnection conn) {
synchronized (this.connectionList) {
this.connectionList.add(conn);
}
}
- public void removeConnection(final DefaultHAConnection conn) {
+ public void removeConnection(final HAConnection conn) {
this.haConnectionStateNotificationService.checkConnectionStateAndNotify(conn);
synchronized (this.connectionList) {
this.connectionList.remove(conn);
@@ -148,6 +148,18 @@ public class DefaultHAService implements HAService {
this.haConnectionStateNotificationService.shutdown();
}
+ @Override public boolean changeToMaster(int masterEpoch) {
+ return false;
+ }
+
+ @Override public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch) {
+ return false;
+ }
+
+ @Override public Set<String> checkSyncStateSetChanged() {
+ return null;
+ }
+
public void destroyConnections() {
synchronized (this.connectionList) {
for (HAConnection c : this.connectionList) {
@@ -180,7 +192,7 @@ public class DefaultHAService implements HAService {
return inSyncNums;
}
- private boolean isInSyncSlave(final long masterPutWhere, HAConnection conn) {
+ protected boolean isInSyncSlave(final long masterPutWhere, HAConnection conn) {
if (masterPutWhere - conn.getSlaveAckOffset() < this.defaultMessageStore.getMessageStoreConfig()
.getHaMaxGapNotInSync()) {
return true;
@@ -241,10 +253,28 @@ public class DefaultHAService implements HAService {
return info;
}
+ class DefaultAcceptSocketService extends AcceptSocketService {
+
+ public DefaultAcceptSocketService(int port) {
+ super(port);
+ }
+
+ @Override protected HAConnection createConnection(SocketChannel sc) throws IOException {
+ return new DefaultHAConnection(DefaultHAService.this, sc);
+ }
+
+ @Override public String getServiceName() {
+ if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+ return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
+ }
+ return DefaultAcceptSocketService.class.getSimpleName();
+ }
+ }
+
/**
* Listens to slave connections to create {@link HAConnection}.
*/
- class AcceptSocketService extends ServiceThread {
+ protected abstract class AcceptSocketService extends ServiceThread {
private final SocketAddress socketAddressListen;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
@@ -302,7 +332,7 @@ public class DefaultHAService implements HAService {
DefaultHAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
- DefaultHAConnection conn = new DefaultHAConnection(DefaultHAService.this, sc);
+ HAConnection conn = createConnection(sc);
conn.start();
DefaultHAService.this.addConnection(conn);
} catch (Exception e) {
@@ -326,14 +356,8 @@ public class DefaultHAService implements HAService {
}
/**
- * {@inheritDoc}
+ * Create ha connection
*/
- @Override
- public String getServiceName() {
- if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
- return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
- }
- return AcceptSocketService.class.getSimpleName();
- }
+ protected abstract HAConnection createConnection(final SocketChannel sc) throws IOException;
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 5f714b90e..20d72c499 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.ha;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
@@ -48,6 +49,26 @@ public interface HAService {
*/
void shutdown();
+ /**
+ * Change to master state
+ * @param masterEpoch the new masterEpoch
+ */
+ boolean changeToMaster(int masterEpoch);
+
+ /**
+ * Change to slave state
+ * @param newMasterAddr new master addr
+ * @param newHaMasterAddr new master HA addr
+ * @param newMasterEpoch new masterEpoch
+ */
+ boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch);
+
+ /**
+ * Check whether the syncStateSet changed
+ * @return new syncStateSet
+ */
+ Set<String> checkSyncStateSetChanged();
+
/**
* Update master address
*
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
new file mode 100644
index 000000000..3ddd367a7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+
+
+ /**
+ * Transfer header buffer size. Schema: state ordinal + additional info(maxOffset or flag)
+ * If in handshake state, we reuse additional info as the flag -- isSyncFromLastFile.
+ */
+ public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+ private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+ private final AtomicReference<String> masterAddress = new AtomicReference<>();
+ private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+ private final AutoSwitchHAService haService;
+ private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+ private final DefaultMessageStore messageStore;
+ private final EpochFileCache epochCache;
+
+ private SocketChannel socketChannel;
+ private Selector selector;
+ private AbstractHAReader haReader;
+ private HAWriter haWriter;
+ private FlowMonitor flowMonitor;
+ /**
+ * last time that slave reads date from master.
+ */
+ private long lastReadTimestamp;
+ /**
+ * last time that slave reports offset to master.
+ */
+ private long lastWriteTimestamp;
+
+ private long currentReportedOffset;
+ private int processPosition;
+ private volatile HAConnectionState currentState;
+ /**
+ * Current epoch
+ */
+ private volatile long currentReceivedEpoch;
+
+ /**
+ * Confirm offset = min(localMaxOffset, master confirm offset).
+ */
+ private volatile long confirmOffset;
+
+ public static final int SYNC_FROM_LAST_FILE = -1;
+
+ public static final int SYNC_FROM_FIRST_FILE = -2;
+
+ public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+ EpochFileCache epochCache) throws IOException {
+ this.haService = haService;
+ this.messageStore = defaultMessageStore;
+ this.epochCache = epochCache;
+ init();
+ }
+
+ public void init() throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+ this.haReader = new HAClientReader();
+ haReader.registerHook(readSize -> {
+ if (readSize > 0) {
+ AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+ lastReadTimestamp = System.currentTimeMillis();
+ }
+ });
+ this.haWriter = new HAWriter();
+ haWriter.registerHook(writeSize -> {
+ if (writeSize > 0) {
+ lastWriteTimestamp = System.currentTimeMillis();
+ }
+ });
+ changeCurrentState(HAConnectionState.READY);
+ this.currentReceivedEpoch = -1;
+ this.currentReportedOffset = 0;
+ this.processPosition = 0;
+ this.confirmOffset = -1;
+ this.lastReadTimestamp = System.currentTimeMillis();
+ this.lastWriteTimestamp = System.currentTimeMillis();
+ }
+
+ public void reOpen() throws IOException {
+ shutdown();
+ init();
+ }
+
+ @Override public String getServiceName() {
+ return AutoSwitchHAClient.class.getSimpleName();
+ }
+
+ @Override public void updateMasterAddress(String newAddress) {
+ String currentAddr = this.masterAddress.get();
+ if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+ LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+ }
+ }
+
+ @Override public void updateHaMasterAddress(String newAddress) {
+ String currentAddr = this.masterHaAddress.get();
+ if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+ LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+ }
+ }
+
+ @Override public String getMasterAddress() {
+ return this.masterAddress.get();
+ }
+
+ @Override public String getHaMasterAddress() {
+ return this.masterHaAddress.get();
+ }
+
+ @Override public long getLastReadTimestamp() {
+ return this.lastReadTimestamp;
+ }
+
+ @Override public long getLastWriteTimestamp() {
+ return this.lastWriteTimestamp;
+ }
+
+ @Override public HAConnectionState getCurrentState() {
+ return this.currentState;
+ }
+
+ @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+ LOGGER.info("change state to {}", haConnectionState);
+ this.currentState = haConnectionState;
+ }
+
+ public void closeMasterAndWait() {
+ this.closeMaster();
+ this.waitForRunning(1000 * 5);
+ }
+
+ @Override public void closeMaster() {
+ if (null != this.socketChannel) {
+ try {
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ this.socketChannel.close();
+ this.socketChannel = null;
+
+ LOGGER.info("AutoSwitchHAClient close connection with master {}", this.masterHaAddress.get());
+ this.changeCurrentState(HAConnectionState.READY);
+ } catch (IOException e) {
+ LOGGER.warn("CloseMaster exception. ", e);
+ }
+
+ this.lastReadTimestamp = 0;
+ this.processPosition = 0;
+
+ this.byteBufferRead.position(0);
+ this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+ }
+ }
+
+ @Override public long getTransferredByteInSecond() {
+ return this.flowMonitor.getTransferredByteInSecond();
+ }
+
+ @Override public void shutdown() {
+ changeCurrentState(HAConnectionState.SHUTDOWN);
+ // Shutdown thread firstly
+ this.flowMonitor.shutdown();
+ super.shutdown();
+
+ closeMaster();
+ try {
+ this.selector.close();
+ } catch (IOException e) {
+ LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+ }
+ }
+
+ private boolean isTimeToReportOffset() {
+ long interval = this.messageStore.now() - this.lastWriteTimestamp;
+ return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+ }
+
+ private boolean sendHandshakeHeader() {
+ this.transferHeaderBuffer.position(0);
+ this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+ this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+ if (this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()) {
+ this.transferHeaderBuffer.putLong(SYNC_FROM_LAST_FILE);
+ } else {
+ this.transferHeaderBuffer.putLong(SYNC_FROM_FIRST_FILE);
+ }
+ this.transferHeaderBuffer.flip();
+ return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+ }
+
+ private void handshakeWithMaster() throws IOException {
+ sendHandshakeHeader();
+ boolean result = this.sendHandshakeHeader();
+ if (!result) {
+ closeMasterAndWait();
+ }
+
+ this.selector.select(5000);
+
+ result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+ if (!result) {
+ closeMasterAndWait();
+ return;
+ }
+ }
+
+ private boolean reportSlaveOffset(final long offsetToReport) {
+ this.transferHeaderBuffer.position(0);
+ this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+ this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+ this.transferHeaderBuffer.putLong(offsetToReport);
+ this.transferHeaderBuffer.flip();
+ return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+ }
+
+ private boolean reportSlaveMaxOffset() {
+ boolean result = true;
+ final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+ if (maxPhyOffset > this.currentReportedOffset) {
+ this.currentReportedOffset = maxPhyOffset;
+ result = reportSlaveOffset(this.currentReportedOffset);
+ }
+ return result;
+ }
+
+ public boolean connectMaster() throws ClosedChannelException {
+ if (null == this.socketChannel) {
+ String addr = this.masterHaAddress.get();
+ if (addr != null) {
+ SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+ this.socketChannel = RemotingUtil.connect(socketAddress);
+ if (this.socketChannel != null) {
+ this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+ LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+ changeCurrentState(HAConnectionState.HANDSHAKE);
+ }
+ }
+ this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+ this.lastReadTimestamp = System.currentTimeMillis();
+ }
+ return this.socketChannel != null;
+ }
+
+ private boolean transferFromMaster() throws IOException {
+ boolean result;
+ if (isTimeToReportOffset()) {
+ LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
+ result = reportSlaveOffset(this.currentReportedOffset);
+ if (!result) {
+ return false;
+ }
+ }
+
+ this.selector.select(1000);
+
+ result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+ if (!result) {
+ return false;
+ }
+
+ return this.reportSlaveMaxOffset();
+ }
+
+ @Override public void run() {
+ LOGGER.info(this.getServiceName() + " service started");
+
+ this.flowMonitor.start();
+ while (!this.isStopped()) {
+ try {
+ switch (this.currentState) {
+ case SHUTDOWN:
+ return;
+ case READY:
+ // Truncate invalid msg first
+ final long truncateOffset = AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+ if (truncateOffset >= 0) {
+ AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+ }
+ if (!connectMaster()) {
+ LOGGER.warn("AutoSwitchHAClient connect to master {} failed", this.masterHaAddress.get());
+ waitForRunning(1000 * 5);
+ }
+ continue;
+ case HANDSHAKE:
+ handshakeWithMaster();
+ continue;
+ case TRANSFER:
+ if (!transferFromMaster()) {
+ closeMasterAndWait();
+ continue;
+ }
+ break;
+ case SUSPEND:
+ default:
+ waitForRunning(1000 * 5);
+ continue;
+ }
+ long interval = this.messageStore.now() - this.lastReadTimestamp;
+ if (interval > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+ LOGGER.warn("AutoSwitchHAClient, housekeeping, found this connection[" + this.masterHaAddress
+ + "] expired, " + interval);
+ closeMaster();
+ LOGGER.warn("AutoSwitchHAClient, master not response some time, so close connection");
+ }
+ } catch (Exception e) {
+ LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+ closeMasterAndWait();
+ }
+ }
+
+ }
+
+ /**
+ * Compare the master and slave's epoch file, find consistent point, do truncate.
+ */
+ private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) {
+ if (this.epochCache.getEntrySize() == 0) {
+ // If epochMap is empty, means the broker is a new replicas
+ LOGGER.info("Slave local epochCache is empty, skip truncate log");
+ changeCurrentState(HAConnectionState.TRANSFER);
+ this.currentReportedOffset = 0;
+ } else {
+ final EpochFileCache masterEpochCache = new EpochFileCache();
+ masterEpochCache.initCacheFromEntries(masterEpochEntries);
+ masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+ final List<EpochEntry> localEpochEntries = this.epochCache.getAllEntries();
+ final EpochFileCache localEpochCache = new EpochFileCache();
+ localEpochCache.initCacheFromEntries(localEpochEntries);
+ localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+ final long truncateOffset = localEpochCache.findConsistentPoint(masterEpochCache);
+ if (truncateOffset < 0) {
+ // If truncateOffset < 0, means we can't find a consistent point
+ LOGGER.error("Failed to find a consistent point between masterEpoch:{} and slaveEpoch:{}", masterEpochEntries, localEpochEntries);
+ return false;
+ }
+ if (!this.messageStore.truncateFiles(truncateOffset)) {
+ LOGGER.error("Failed to truncate slave log to {}", truncateOffset);
+ return false;
+ }
+ this.epochCache.truncateSuffixByOffset(truncateOffset);
+ LOGGER.info("Truncate slave log to {} success, change to transfer state", truncateOffset);
+ changeCurrentState(HAConnectionState.TRANSFER);
+ this.currentReportedOffset = truncateOffset;
+ }
+ if (!reportSlaveMaxOffset()) {
+ LOGGER.error("AutoSwitchHAClient report max offset to master failed");
+ return false;
+ }
+ return true;
+ }
+
+ class HAClientReader extends AbstractHAReader {
+
+ @Override
+ protected boolean processReadResult(ByteBuffer byteBufferRead) {
+ int readSocketPos = byteBufferRead.position();
+
+ while (true) {
+ int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
+ if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
+ int processPosition = AutoSwitchHAClient.this.processPosition;
+ int masterState = byteBufferRead.getInt(processPosition);
+ int bodySize = byteBufferRead.getInt(processPosition + 4);
+ long masterOffset = byteBufferRead.getLong(processPosition + 4 + 4);
+ int masterEpoch = byteBufferRead.getInt(processPosition + 4 + 4 + 8);
+ long masterEpochStartOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
+ long confirmOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+
+ if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
+ AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+ AutoSwitchHAClient.this.waitForRunning(1);
+ LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+ masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+ return true;
+ }
+ LOGGER.info("Receive master msg, masterState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+ HAConnectionState.values()[masterState], bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+
+ if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) {
+ switch (AutoSwitchHAClient.this.currentState) {
+ case HANDSHAKE:
+ AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE;
+ // Truncate log
+ int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+ final int entryNums = bodySize / entrySize;
+ final ArrayList<EpochEntry> epochEntries = new ArrayList<>(entryNums);
+ for (int i = 0; i < entryNums; i++) {
+ int epoch = byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+ long startOffset = byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize + 4);
+ epochEntries.add(new EpochEntry(epoch, startOffset));
+ }
+ byteBufferRead.position(readSocketPos);
+ AutoSwitchHAClient.this.processPosition += bodySize;
+ LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
+ if (!doTruncate(epochEntries, masterOffset)) {
+ waitForRunning(1000 * 2);
+ LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
+ return false;
+ }
+ break;
+ case TRANSFER:
+ byte[] bodyData = new byte[bodySize];
+ byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE);
+ byteBufferRead.get(bodyData);
+ byteBufferRead.position(readSocketPos);
+ AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+
+ long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+ if (slavePhyOffset != 0) {
+ if (slavePhyOffset != masterOffset) {
+ LOGGER.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ + slavePhyOffset + " MASTER: " + masterOffset);
+ return false;
+ }
+ }
+
+ // If epoch changed
+ if (masterEpoch != AutoSwitchHAClient.this.currentReceivedEpoch) {
+ AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+ AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset));
+ }
+ AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+ if (bodySize > 0) {
+ final DefaultMessageStore messageStore = AutoSwitchHAClient.this.messageStore;
+ if (messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length)) {
+ LOGGER.info("Slave append master log success, from {}, size {}, epoch:{}", masterOffset, bodySize, masterEpoch);
+ }
+ }
+
+ if (!reportSlaveMaxOffset()) {
+ LOGGER.error("AutoSwitchHAClient report max offset to master failed");
+ return false;
+ }
+ break;
+ default:
+ break;
+ }
+ continue;
+ }
+ }
+
+ if (!byteBufferRead.hasRemaining()) {
+ byteBufferRead.position(AutoSwitchHAClient.this.processPosition);
+ byteBufferRead.compact();
+ AutoSwitchHAClient.this.processPosition = 0;
+ }
+
+ break;
+ }
+ return true;
+ }
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
new file mode 100644
index 000000000..0b7ce6b39
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+ /**
+ * Header protocol in syncing msg from master.
+ * Format: current state + body size + offset + epoch + epochStartOffset + additionalInfo(confirmOffset).
+ * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+ */
+ public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+ public static final int EPOCH_ENTRY_SIZE = 12;
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final AutoSwitchHAService haService;
+ private final SocketChannel socketChannel;
+ private final String clientAddress;
+ private final EpochFileCache epochCache;
+ private final AbstractWriteSocketService writeSocketService;
+ private final ReadSocketService readSocketService;
+ private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+ private volatile long slaveRequestOffset = -1;
+ private volatile long slaveAckOffset = -1;
+ /**
+ * Whether the slave have already sent a handshake message
+ */
+ private volatile boolean isSlaveSendHandshake = false;
+ private volatile int currentTransferEpoch = -1;
+ private volatile long currentTransferEpochEndOffset = 0;
+ private volatile boolean isSyncFromLastFile = false;
+ private final FlowMonitor flowMonitor;
+
+ public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+ EpochFileCache epochCache) throws IOException {
+ this.haService = haService;
+ this.socketChannel = socketChannel;
+ this.epochCache = epochCache;
+ this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+ this.socketChannel.configureBlocking(false);
+ this.socketChannel.socket().setSoLinger(false, -1);
+ this.socketChannel.socket().setTcpNoDelay(true);
+ this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+ this.socketChannel.socket().setSendBufferSize(1024 * 64);
+ this.writeSocketService = new WriteSocketService(this.socketChannel);
+ this.readSocketService = new ReadSocketService(this.socketChannel);
+ this.haService.getConnectionCount().incrementAndGet();
+ this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+ }
+
+ @Override public void start() {
+ changeCurrentState(HAConnectionState.HANDSHAKE);
+ this.flowMonitor.start();
+ this.readSocketService.start();
+ this.writeSocketService.start();
+ }
+
+ @Override public void shutdown() {
+ changeCurrentState(HAConnectionState.SHUTDOWN);
+ this.flowMonitor.shutdown(true);
+ this.writeSocketService.shutdown(true);
+ this.readSocketService.shutdown(true);
+ this.close();
+ }
+
+ @Override public void close() {
+ if (this.socketChannel != null) {
+ try {
+ this.socketChannel.close();
+ } catch (final IOException e) {
+ LOGGER.error("", e);
+ }
+ }
+ }
+
+ public void changeCurrentState(HAConnectionState connectionState) {
+ LOGGER.info("change state to {}", connectionState);
+ this.currentState = connectionState;
+ }
+
+ @Override public HAConnectionState getCurrentState() {
+ return currentState;
+ }
+
+ @Override public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ @Override public String getClientAddress() {
+ return clientAddress;
+ }
+
+ @Override public long getSlaveAckOffset() {
+ return slaveAckOffset;
+ }
+
+ @Override public long getTransferredByteInSecond() {
+ return flowMonitor.getTransferredByteInSecond();
+ }
+
+ @Override public long getTransferFromWhere() {
+ return this.writeSocketService.getNextTransferFromWhere();
+ }
+
+ private void changeTransferEpochToNext(final EpochEntry entry) {
+ this.currentTransferEpoch = entry.getEpoch();
+ this.currentTransferEpochEndOffset = entry.getEndOffset();
+ if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+ // Use -1 to stand for Long.max
+ this.currentTransferEpochEndOffset = -1;
+ }
+ }
+
+ class ReadSocketService extends ServiceThread {
+ private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+ private final Selector selector;
+ private final SocketChannel socketChannel;
+ private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+ private final AbstractHAReader haReader;
+ private int processPosition = 0;
+ private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+ public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ this.socketChannel = socketChannel;
+ this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+ this.setDaemon(true);
+ haReader = new HAServerReader();
+ haReader.registerHook(readSize -> {
+ if (readSize > 0) {
+ ReadSocketService.this.lastReadTimestamp =
+ haService.getDefaultMessageStore().getSystemClock().now();
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.selector.select(1000);
+ boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+ if (!ok) {
+ AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+ break;
+ }
+
+ long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+ if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+ LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+ break;
+ }
+ } catch (Exception e) {
+ AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+ break;
+ }
+ }
+
+ this.makeStop();
+
+ changeCurrentState(HAConnectionState.SHUTDOWN);
+
+ writeSocketService.makeStop();
+
+ haService.removeConnection(AutoSwitchHAConnection.this);
+
+ haService.getConnectionCount().decrementAndGet();
+
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ try {
+ this.selector.close();
+ this.socketChannel.close();
+ } catch (IOException e) {
+ AutoSwitchHAConnection.LOGGER.error("", e);
+ }
+
+ AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return ReadSocketService.class.getSimpleName();
+ }
+
+ class HAServerReader extends AbstractHAReader {
+ @Override
+ protected boolean processReadResult(ByteBuffer byteBufferRead) {
+ while (true) {
+ int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+ if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+ int readPosition = ReadSocketService.this.processPosition;
+ HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+ switch (slaveState) {
+ case HANDSHAKE:
+ isSlaveSendHandshake = true;
+ long syncFromLastFileFlag = byteBufferRead.getLong(readPosition + 4);
+ if (syncFromLastFileFlag == AutoSwitchHAClient.SYNC_FROM_LAST_FILE) {
+ AutoSwitchHAConnection.this.isSyncFromLastFile = true;
+ LOGGER.info("Slave request sync from lastFile");
+ }
+ ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+ break;
+ case TRANSFER:
+ long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+ ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+ slaveAckOffset = slaveMaxOffset;
+ if (slaveRequestOffset < 0) {
+ slaveRequestOffset = slaveMaxOffset;
+ }
+ LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+ AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+ break;
+ default:
+ LOGGER.error("Current state illegal {}", currentState);
+ break;
+ }
+
+ if (!slaveState.equals(currentState)) {
+ LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+ changeCurrentState(slaveState);
+ }
+ continue;
+ }
+
+ if (!byteBufferRead.hasRemaining()) {
+ byteBufferRead.position(ReadSocketService.this.processPosition);
+ byteBufferRead.compact();
+ ReadSocketService.this.processPosition = 0;
+ }
+ break;
+ }
+
+ return true;
+ }
+ }
+ }
+
+ class WriteSocketService extends AbstractWriteSocketService {
+ private SelectMappedBufferResult selectMappedBufferResult;
+
+ public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+ super(socketChannel);
+ }
+
+ @Override
+ protected int getNextTransferDataSize() {
+ SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+ if (selectResult == null || selectResult.getSize() <= 0) {
+ return 0;
+ }
+ this.selectMappedBufferResult = selectResult;
+ return selectResult.getSize();
+ }
+
+ @Override
+ protected void releaseData() {
+ this.selectMappedBufferResult.release();
+ this.selectMappedBufferResult = null;
+ }
+
+ @Override
+ protected boolean transferData(int maxTransferSize) {
+
+ if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+ this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+ }
+
+ // Write Header
+ boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+ if (!result) {
+ return false;
+ }
+
+ if (null == this.selectMappedBufferResult) {
+ return true;
+ }
+
+ // Write Body
+ result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+ if (result) {
+ releaseData();
+ }
+ return result;
+ }
+
+ @Override
+ protected void onStop() {
+ if (this.selectMappedBufferResult != null) {
+ this.selectMappedBufferResult.release();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+ return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+ }
+ return WriteSocketService.class.getSimpleName();
+ }
+ }
+
+ abstract class AbstractWriteSocketService extends ServiceThread {
+ protected final Selector selector;
+ protected final SocketChannel socketChannel;
+ protected final HAWriter haWriter;
+
+ protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+ // Store master epochFileCache: (Epoch + startOffset) * 1000
+ private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+ protected long nextTransferFromWhere = -1;
+ protected boolean lastWriteOver = true;
+ protected long lastWriteTimestamp = System.currentTimeMillis();
+ protected long lastPrintTimestamp = System.currentTimeMillis();
+ protected long transferOffset = 0;
+
+ public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+ this.selector = RemotingUtil.openSelector();
+ this.socketChannel = socketChannel;
+ this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+ this.setDaemon(true);
+ haWriter = new HAWriter();
+ haWriter.registerHook(writeSize -> {
+ flowMonitor.addByteCountTransferred(writeSize);
+ if (writeSize > 0) {
+ AbstractWriteSocketService.this.lastWriteTimestamp =
+ haService.getDefaultMessageStore().getSystemClock().now();
+ }
+ });
+ }
+
+ public long getNextTransferFromWhere() {
+ return this.nextTransferFromWhere;
+ }
+
+ // Handle shake method
+ private boolean buildHandshakeBuffer() {
+ final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+ final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+ final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+ this.byteBufferHeader.position(0);
+ this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ // State
+ this.byteBufferHeader.putInt(currentState.ordinal());
+ // Body size
+ this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+ // Offset
+ this.byteBufferHeader.putLong(maxPhyOffset);
+ // Epoch
+ this.byteBufferHeader.putInt(lastEpoch);
+ // EpochStartOffset (not needed in handshake)
+ this.byteBufferHeader.putLong(0L);
+ // Additional info (not needed in handshake)
+ this.byteBufferHeader.putLong(0L);
+ this.byteBufferHeader.flip();
+
+ // EpochEntries
+ this.handShakeBuffer.position(0);
+ this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+ for (final EpochEntry entry : epochEntries) {
+ if (entry != null) {
+ this.handShakeBuffer.putInt(entry.getEpoch());
+ this.handShakeBuffer.putLong(entry.getStartOffset());
+ }
+ }
+ this.handShakeBuffer.flip();
+ LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+ return true;
+ }
+
+ private boolean handshakeWithSlave() {
+ // Write Header
+ boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+ if (!result) {
+ return false;
+ }
+
+ // Write Body
+ return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+ }
+
+ // Normal transfer method
+ private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+ final EpochEntry entry = AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+ if (entry == null) {
+ LOGGER.error("Failed to find epochEntry with epoch {} when build msg header", AutoSwitchHAConnection.this.currentTransferEpoch);
+ return;
+ }
+ // Build Header
+ this.byteBufferHeader.position(0);
+ this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+ // State
+ this.byteBufferHeader.putInt(currentState.ordinal());
+ // Body size
+ this.byteBufferHeader.putInt(bodySize);
+ // Offset
+ this.byteBufferHeader.putLong(nextOffset);
+ // Epoch
+ this.byteBufferHeader.putInt(entry.getEpoch());
+ // EpochStartOffset
+ this.byteBufferHeader.putLong(entry.getStartOffset());
+ // Additional info(confirm offset)
+ final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+ this.byteBufferHeader.putLong(confirmOffset);
+ this.byteBufferHeader.flip();
+ LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+ currentState, bodySize, nextOffset, entry.getEpoch(), entry.getStartOffset(), confirmOffset);
+ }
+
+ private void transferToSlave() throws Exception {
+ if (this.lastWriteOver) {
+ long interval =
+ haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+ if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+ .getHaSendHeartbeatInterval()) {
+
+ buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+ this.lastWriteOver = this.transferData(0);
+ if (!this.lastWriteOver) {
+ return;
+ }
+ }
+ } else {
+ // maxTransferSize == -1 means to continue transfer remaining data.
+ this.lastWriteOver = this.transferData(-1);
+ if (!this.lastWriteOver) {
+ return;
+ }
+ }
+
+ int size = this.getNextTransferDataSize();
+ if (size > 0) {
+ if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+ size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+ }
+ int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+ if (size > canTransferMaxBytes) {
+ if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+ LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+ String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+ String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+ lastPrintTimestamp = System.currentTimeMillis();
+ }
+ size = canTransferMaxBytes;
+ }
+ if (size <= 0) {
+ this.releaseData();
+ this.waitForRunning(100);
+ return;
+ }
+
+ // We must ensure that the transmitted logs are within the same epoch
+ // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = Long.max
+ final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+ if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+ final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+ if (epochEntry == null) {
+ LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+ waitForRunning(100);
+ return;
+ }
+ size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+ changeTransferEpochToNext(epochEntry);
+ }
+
+ this.transferOffset = this.nextTransferFromWhere;
+ this.nextTransferFromWhere += size;
+
+ // Build Header
+ buildTransferHeaderBuffer(this.transferOffset, size);
+
+ this.lastWriteOver = this.transferData(size);
+ } else {
+ haService.getWaitNotifyObject().allWaitForRunning(100);
+ }
+ }
+
+ @Override public void run() {
+ AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.selector.select(1000);
+
+ switch (currentState) {
+ case HANDSHAKE:
+ // Wait until the slave send it handshake msg to master.
+ if (!isSlaveSendHandshake) {
+ this.waitForRunning(10);
+ continue;
+ }
+
+ if (this.lastWriteOver) {
+ if (!buildHandshakeBuffer()) {
+ LOGGER.error("AutoSwitchHAConnection build handshake buffer failed");
+ this.waitForRunning(5000);
+ continue;
+ }
+ }
+
+ this.lastWriteOver = handshakeWithSlave();
+ if (this.lastWriteOver) {
+ // change flag to {false} to wait for slave notification
+ isSlaveSendHandshake = false;
+ }
+ break;
+ case TRANSFER:
+ if (-1 == slaveRequestOffset) {
+ this.waitForRunning(10);
+ continue;
+ }
+
+ if (-1 == this.nextTransferFromWhere) {
+ if (0 == slaveRequestOffset) {
+ // We must ensure that the starting point of syncing log
+ // must be the startOffset of a file (maybe the last file, or the minOffset)
+ final MessageStoreConfig config = haService.getDefaultMessageStore().getMessageStoreConfig();
+ if (AutoSwitchHAConnection.this.isSyncFromLastFile) {
+ long masterOffset = haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
+ masterOffset = masterOffset - (masterOffset % config.getMappedFileSizeCommitLog());
+ if (masterOffset < 0) {
+ masterOffset = 0;
+ }
+ this.nextTransferFromWhere = masterOffset;
+ } else {
+ this.nextTransferFromWhere = haService.getDefaultMessageStore().getCommitLog().getMinOffset();
+ }
+ } else {
+ this.nextTransferFromWhere = slaveRequestOffset;
+ }
+ // Setup initial transferEpoch
+ EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
+ if (epochEntry == null) {
+ LOGGER.error("Failed to find an epochEntry to match slaveRequestOffset {}", this.nextTransferFromWhere);
+ waitForRunning(500);
+ break;
+ }
+ changeTransferEpochToNext(epochEntry);
+ LOGGER.info("Master transfer data to slave {}, from offset:{}, currentEpoch:{}",
+ AutoSwitchHAConnection.this.clientAddress, this.nextTransferFromWhere, epochEntry);
+ }
+ transferToSlave();
+ break;
+ default:
+ throw new Exception("unexpected state " + currentState);
+ }
+ } catch (Exception e) {
+ AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+ break;
+ }
+ }
+
+ this.onStop();
+
+ changeCurrentState(HAConnectionState.SHUTDOWN);
+
+ this.makeStop();
+
+ readSocketService.makeStop();
+
+ haService.removeConnection(AutoSwitchHAConnection.this);
+
+ SelectionKey sk = this.socketChannel.keyFor(this.selector);
+ if (sk != null) {
+ sk.cancel();
+ }
+
+ try {
+ this.selector.close();
+ this.socketChannel.close();
+ } catch (IOException e) {
+ AutoSwitchHAConnection.LOGGER.error("", e);
+ }
+ AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+ }
+
+ abstract protected int getNextTransferDataSize();
+
+ abstract protected void releaseData();
+
+ abstract protected boolean transferData(int maxTransferSize) throws Exception;
+
+ abstract protected void onStop();
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
new file mode 100644
index 000000000..4ccddf215
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.DefaultHAService;
+import org.apache.rocketmq.store.ha.GroupTransferService;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
+
+/**
+ * SwitchAble ha service, support switch role to master or slave.
+ */
+public class AutoSwitchHAService extends DefaultHAService {
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private EpochFileCache epochCache;
+ private AutoSwitchHAClient haClient;
+
+ public AutoSwitchHAService() {
+ }
+
+ @Override
+ public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
+ this.epochCache = new EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
+ this.epochCache.initCacheFromFile();
+ this.defaultMessageStore = defaultMessageStore;
+ this.acceptSocketService =
+ new AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
+ if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+ this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+ }
+ this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
+ }
+
+ @Override
+ public boolean changeToMaster(int masterEpoch) {
+ final int lastEpoch = this.epochCache.lastEpoch();
+ if (masterEpoch <= lastEpoch) {
+ return false;
+ }
+ destroyConnections();
+ // Stop ha client if needed
+ if (this.haClient != null) {
+ this.haClient.shutdown();
+ }
+
+ // Truncate dirty file
+ final long truncateOffset = truncateInvalidMsg();
+ if (truncateOffset >= 0) {
+ this.epochCache.truncateSuffixByOffset(truncateOffset);
+ }
+
+ // Append new epoch to epochFile
+ final EpochEntry newEpochEntry = new EpochEntry(masterEpoch, this.defaultMessageStore.getMaxPhyOffset());
+ if (this.epochCache.lastEpoch() >= masterEpoch) {
+ this.epochCache.truncateSuffixByEpoch(masterEpoch);
+ }
+ this.epochCache.appendEntry(newEpochEntry);
+
+ this.defaultMessageStore.recoverTopicQueueTable();
+ LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
+ return true;
+ }
+
+ @Override
+ public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch) {
+ final int lastEpoch = this.epochCache.lastEpoch();
+ if (newMasterEpoch <= lastEpoch) {
+ return false;
+ }
+ try {
+ destroyConnections();
+ if (this.haClient == null) {
+ this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+ } else {
+ this.haClient.reOpen();
+ }
+ this.haClient.updateHaMasterAddress(newHaMasterAddr);
+ this.haClient.updateMasterAddress(newMasterAddr);
+ this.haClient.start();
+ LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
+ return true;
+ } catch (final Exception e) {
+ LOGGER.error("Error happen when change ha to slave", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Set<String> checkSyncStateSetChanged() {
+ final HashSet<String> newSyncStateSet = new HashSet<>(this.connectionList.size());
+ final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
+ for (HAConnection connection : this.connectionList) {
+ if (isInSyncSlave(masterOffset, connection)) {
+ newSyncStateSet.add(connection.getClientAddress());
+ }
+ }
+ return newSyncStateSet;
+ }
+
+ public void truncateEpochFilePrefix(final long offset) {
+ this.epochCache.truncatePrefixByOffset(offset);
+ }
+
+ public void truncateEpochFileSuffix(final long offset) {
+ this.epochCache.truncateSuffixByOffset(offset);
+ }
+
+ /**
+ * Try to truncate incomplete msg transferred from master.
+ */
+ public long truncateInvalidMsg() {
+ long dispatchBehind = this.defaultMessageStore.dispatchBehindBytes();
+ if (dispatchBehind <= 0) {
+ LOGGER.info("Dispatch complete, skip truncate");
+ return -1;
+ }
+
+ long reputFromOffset = this.defaultMessageStore.getMaxPhyOffset() - dispatchBehind;
+
+ boolean doNext = true;
+ while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && doNext) {
+ SelectMappedBufferResult result = this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
+ if (result == null) {
+ break;
+ }
+
+ try {
+ reputFromOffset = result.getStartOffset();
+
+ int readSize = 0;
+ while (readSize < result.getSize()) {
+ DispatchRequest dispatchRequest = this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+ int size = dispatchRequest.getMsgSize();
+
+ if (dispatchRequest.isSuccess()) {
+ if (size > 0) {
+ reputFromOffset += size;
+ readSize += size;
+ } else {
+ reputFromOffset = this.defaultMessageStore.getCommitLog().rollNextFile(reputFromOffset);
+ break;
+ }
+ } else {
+ doNext = false;
+ break;
+ }
+ }
+ } finally {
+ result.release();
+ }
+ }
+
+ LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", reputFromOffset);
+ this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
+ return reputFromOffset;
+ }
+
+ /**
+ * Get confirm offset (min slaveAckOffset of all syncStateSet)
+ */
+ public long getConfirmOffset() {
+ long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ for (HAConnection connection : this.connectionList) {
+ confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
+ }
+ return confirmOffset;
+ }
+
+ class AutoSwitchAcceptSocketService extends AcceptSocketService {
+
+ public AutoSwitchAcceptSocketService(int port) {
+ super(port);
+ }
+
+ @Override public String getServiceName() {
+ if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+ return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
+ }
+ return AutoSwitchAcceptSocketService.class.getSimpleName();
+ }
+
+ @Override protected HAConnection createConnection(SocketChannel sc) throws IOException {
+ return new AutoSwitchHAConnection(AutoSwitchHAService.this, sc, AutoSwitchHAService.this.epochCache);
+ }
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java
new file mode 100644
index 000000000..f2b8b7d85
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = this.readWriteLock.readLock();
+ private final Lock writeLock = this.readWriteLock.writeLock();
+ private final TreeMap<Integer, EpochEntry> epochMap;
+ private CheckpointFile<EpochEntry> checkpoint;
+
+ public EpochFileCache() {
+ this.epochMap = new TreeMap<>();
+ }
+
+ public EpochFileCache(final String path) {
+ this.epochMap = new TreeMap<>();
+ this.checkpoint = new CheckpointFile<>(path, new EpochEntrySerializer());
+ }
+
+ public boolean initCacheFromFile() {
+ this.writeLock.lock();
+ try {
+ final List<EpochEntry> entries = this.checkpoint.read();
+ initEntries(entries);
+ return true;
+ } catch (final IOException e) {
+ log.error("Error happen when init epoch entries from epochFile", e);
+ return false;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public void initCacheFromEntries(final List<EpochEntry> entries) {
+ this.writeLock.lock();
+ try {
+ initEntries(entries);
+ flush();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void initEntries(final List<EpochEntry> entries) {
+ this.epochMap.clear();
+ EpochEntry preEntry = null;
+ for (final EpochEntry entry : entries) {
+ this.epochMap.put(entry.getEpoch(), entry);
+ if (preEntry != null) {
+ preEntry.setEndOffset(entry.getStartOffset());
+ }
+ preEntry = entry;
+ }
+ }
+
+ public int getEntrySize() {
+ this.readLock.lock();
+ try {
+ return this.epochMap.size();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public boolean appendEntry(final EpochEntry entry) {
+ this.writeLock.lock();
+ try {
+ if (!this.epochMap.isEmpty()) {
+ final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+ if (lastEntry.getEpoch() >= entry.getEpoch() || lastEntry.getStartOffset() >= entry.getStartOffset()) {
+ log.error("The appending entry's lastEpoch or endOffset {} is not bigger than lastEntry {}, append failed", entry, lastEntry);
+ return false;
+ }
+ lastEntry.setEndOffset(entry.getStartOffset());
+ }
+ this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+ flush();
+ return true;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Set endOffset for lastEpochEntry.
+ */
+ public void setLastEpochEntryEndOffset(final long endOffset) {
+ this.writeLock.lock();
+ try {
+ if (!this.epochMap.isEmpty()) {
+ final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+ if (lastEntry.getStartOffset() <= endOffset) {
+ lastEntry.setEndOffset(endOffset);
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public EpochEntry firstEntry() {
+ this.readLock.lock();
+ try {
+ if (this.epochMap.isEmpty()) {
+ return null;
+ }
+ return new EpochEntry(this.epochMap.firstEntry().getValue());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public EpochEntry lastEntry() {
+ this.readLock.lock();
+ try {
+ if (this.epochMap.isEmpty()) {
+ return null;
+ }
+ return new EpochEntry(this.epochMap.lastEntry().getValue());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public int lastEpoch() {
+ final EpochEntry entry = lastEntry();
+ if (entry != null) {
+ return entry.getEpoch();
+ }
+ return -1;
+ }
+
+ public EpochEntry getEntry(final int epoch) {
+ this.readLock.lock();
+ try {
+ if (this.epochMap.containsKey(epoch)) {
+ final EpochEntry entry = this.epochMap.get(epoch);
+ return new EpochEntry(entry);
+ }
+ return null;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public EpochEntry findEpochEntryByOffset(final long offset) {
+ this.readLock.lock();
+ try {
+ if (!this.epochMap.isEmpty()) {
+ for (Map.Entry<Integer, EpochEntry> entry : this.epochMap.entrySet()) {
+ if (entry.getValue().getStartOffset() <= offset && entry.getValue().getEndOffset() > offset) {
+ return new EpochEntry(entry.getValue());
+ }
+ }
+ }
+ return null;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public EpochEntry nextEntry(final int epoch) {
+ this.readLock.lock();
+ try {
+ final Map.Entry<Integer, EpochEntry> entry = this.epochMap.ceilingEntry(epoch + 1);
+ if (entry != null) {
+ return new EpochEntry(entry.getValue());
+ }
+ return null;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public List<EpochEntry> getAllEntries() {
+ this.readLock.lock();
+ try {
+ final ArrayList<EpochEntry> result = new ArrayList<>(this.epochMap.size());
+ this.epochMap.forEach((key, value) -> result.add(new EpochEntry(value)));
+ return result;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Find the consistentPoint between compareCache and local.
+ *
+ * @return the consistent offset
+ */
+ public long findConsistentPoint(final EpochFileCache compareCache) {
+ this.readLock.lock();
+ try {
+ long consistentOffset = -1;
+ final Map<Integer, EpochEntry> descendingMap = new TreeMap<>(this.epochMap).descendingMap();
+ final Iterator<Map.Entry<Integer, EpochEntry>> iter = descendingMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ final Map.Entry<Integer, EpochEntry> curLocalEntry = iter.next();
+ final EpochEntry compareEntry = compareCache.getEntry(curLocalEntry.getKey());
+ if (compareEntry != null && compareEntry.getStartOffset() == curLocalEntry.getValue().getStartOffset()) {
+ consistentOffset = Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+ break;
+ }
+ }
+ return consistentOffset;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Remove epochEntries with epoch >= truncateEpoch.
+ */
+ public void truncateSuffixByEpoch(final int truncateEpoch) {
+ Predicate<EpochEntry> predict = (entry) -> entry.getEpoch() >= truncateEpoch;
+ doTruncateSuffix(predict);
+ }
+
+ /**
+ * Remove epochEntries with startOffset >= truncateOffset.
+ */
+ public void truncateSuffixByOffset(final long truncateOffset) {
+ Predicate<EpochEntry> predict = (entry) -> entry.getStartOffset() >= truncateOffset;
+ doTruncateSuffix(predict);
+ }
+
+ private void doTruncateSuffix(Predicate<EpochEntry> predict) {
+ this.writeLock.lock();
+ try {
+ this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+ final EpochEntry entry = lastEntry();
+ if (entry != null) {
+ entry.setEndOffset(Long.MAX_VALUE);
+ }
+ flush();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Remove epochEntries with endOffset <= truncateOffset.
+ */
+ public void truncatePrefixByOffset(final long truncateOffset) {
+ Predicate<EpochEntry> predict = (entry) -> entry.getEndOffset() <= truncateOffset;
+ this.writeLock.lock();
+ try {
+ this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+ flush();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void flush() {
+ this.writeLock.lock();
+ try {
+ if (this.checkpoint != null) {
+ final ArrayList<EpochEntry> entries = new ArrayList<>(this.epochMap.values());
+ this.checkpoint.write(entries);
+ }
+ } catch (final IOException e) {
+ log.error("Error happen when flush epochEntries to epochCheckpointFile", e);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ static class EpochEntrySerializer implements CheckpointFile.CheckpointSerializer<EpochEntry> {
+
+ @Override
+ public String toLine(EpochEntry entry) {
+ if (entry != null) {
+ return String.format("%d-%d", entry.getEpoch(), entry.getStartOffset());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public EpochEntry fromLine(String line) {
+ final String[] arr = line.split("-");
+ if (arr.length == 2) {
+ final int epoch = Integer.parseInt(arr[0]);
+ final long startOffset = Long.parseLong(arr[1]);
+ return new EpochEntry(epoch, startOffset);
+ }
+ return null;
+ }
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java
new file mode 100644
index 000000000..105295076
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public abstract class AbstractHAReader {
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ protected final List<HAReadHook> readHookList = new ArrayList<>();
+
+ public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) {
+ int readSizeZeroTimes = 0;
+ while (byteBufferRead.hasRemaining()) {
+ try {
+ int readSize = socketChannel.read(byteBufferRead);
+ for (HAReadHook readHook : readHookList) {
+ readHook.afterRead(readSize);
+ }
+ if (readSize > 0) {
+ readSizeZeroTimes = 0;
+ boolean result = processReadResult(byteBufferRead);
+ if (!result) {
+ LOGGER.error("Process read result failed");
+ return false;
+ }
+ } else if (readSize == 0) {
+ if (++readSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ LOGGER.info("Read socket < 0");
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.info("Read socket exception", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void registerHook(HAReadHook readHook) {
+ readHookList.add(readHook);
+ }
+
+ public void clearHook() {
+ readHookList.clear();
+ }
+
+ /**
+ * Process read result.
+ *
+ * @param byteBufferRead read result
+ * @return true if process succeed, false otherwise
+ */
+ protected abstract boolean processReadResult(ByteBuffer byteBufferRead);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java
new file mode 100644
index 000000000..4efcadd55
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+public interface HAReadHook {
+ void afterRead(int readSize);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java
new file mode 100644
index 000000000..959432888
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+public interface HAWriteHook {
+ void afterWrite(int writeSize);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
new file mode 100644
index 000000000..21daaa6f7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class HAWriter {
+ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ protected final List<HAWriteHook> writeHookList = new ArrayList<>();
+
+ public boolean write(SocketChannel socketChannel, ByteBuffer byteBufferWrite) {
+ int writeSizeZeroTimes = 0;
+ while (byteBufferWrite.hasRemaining()) {
+ try {
+ int writeSize = socketChannel.write(byteBufferWrite);
+ for (HAWriteHook writeHook : writeHookList) {
+ writeHook.afterWrite(writeSize);
+ }
+ if (writeSize > 0) {
+ writeSizeZeroTimes = 0;
+ } else if (writeSize == 0) {
+ if (++writeSizeZeroTimes >= 3) {
+ break;
+ }
+ } else {
+ LOGGER.info("Write socket < 0");
+ }
+ } catch (IOException e) {
+ LOGGER.info("Write socket exception", e);
+ return false;
+ }
+
+ }
+
+ return !byteBufferWrite.hasRemaining();
+ }
+
+ public void registerHook(HAWriteHook writeHook) {
+ writeHookList.add(writeHook);
+ }
+
+ public void clearHook() {
+ writeHookList.clear();
+ }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
new file mode 100644
index 000000000..aadf683a2
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.ha;
+
+import java.io.File;
+import java.nio.file.Paths;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class EpochFileCacheTest {
+ private EpochFileCache epochCache;
+ private EpochFileCache epochCache2;
+ private String path;
+ private String path2;
+
+ @Before
+ public void setup() {
+ this.path = Paths.get(File.separator + "tmp", "EpochCheckpoint").toString();
+ this.epochCache = new EpochFileCache(path);
+ assertTrue(this.epochCache.appendEntry(new EpochEntry(1, 100)));
+ assertTrue(this.epochCache.appendEntry(new EpochEntry(2, 300)));
+ assertTrue(this.epochCache.appendEntry(new EpochEntry(3, 500)));
+ final EpochEntry entry = this.epochCache.getEntry(2);
+ assertEquals(entry.getEpoch(), 2);
+ assertEquals(entry.getStartOffset(), 300);
+ assertEquals(entry.getEndOffset(), 500);
+ }
+
+ @After
+ public void shutdown() {
+ new File(this.path).delete();
+ if (this.path2 != null) {
+ new File(this.path2).delete();
+ }
+ }
+
+ @Test
+ public void testInitFromFile() {
+ // Remove entries, init from file
+ assertTrue(this.epochCache.initCacheFromFile());
+ final EpochEntry entry = this.epochCache.getEntry(2);
+ assertEquals(entry.getEpoch(), 2);
+ assertEquals(entry.getStartOffset(), 300);
+ assertEquals(entry.getEndOffset(), 500);
+ }
+
+ @Test
+ public void testTruncate() {
+ this.epochCache.truncateSuffixByOffset(150);
+ assertNotNull(this.epochCache.getEntry(1));
+ assertNull(this.epochCache.getEntry(2));
+ }
+
+ @Test
+ public void testFindEpochEntryByOffset() {
+ final EpochEntry entry = this.epochCache.findEpochEntryByOffset(350);
+ assertEquals(entry.getEpoch(), 2);
+ assertEquals(entry.getStartOffset(), 300);
+ assertEquals(entry.getEndOffset(), 500);
+ }
+
+ @Test
+ public void testFindConsistentPointSample1() {
+ this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+ this.epochCache2 = new EpochFileCache(path2);
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 450)));
+ /**
+ * cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>
+ * cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 450>
+ * The consistent point should be 450
+ */
+ final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+ assertEquals(consistentPoint, 450);
+ }
+
+ @Test
+ public void testFindConsistentPointSample2() {
+ this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+ this.epochCache2 = new EpochFileCache(path2);
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+ /**
+ * cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+ * cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 600>
+ * The consistent point should be 600
+ */
+ this.epochCache.setLastEpochEntryEndOffset(700);
+ this.epochCache2.setLastEpochEntryEndOffset(600);
+ final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+ assertEquals(consistentPoint, 600);
+ }
+
+ @Test
+ public void testFindConsistentPointSample3() {
+ this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+ this.epochCache2 = new EpochFileCache(path2);
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 200)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 500)));
+ /**
+ * cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+ * cache2: <Epoch1, 200>, <Epoch2, 500>
+ * The consistent point should be -1
+ */
+ final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+ assertEquals(consistentPoint, -1);
+ }
+
+ @Test
+ public void testFindConsistentPointSample4() {
+ this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+ this.epochCache2 = new EpochFileCache(path2);
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+ assertTrue(this.epochCache2.appendEntry(new EpochEntry(4, 800)));
+ /**
+ * cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+ * cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>, <Epoch4, 800>
+ * The consistent point should be 700
+ */
+ this.epochCache.setLastEpochEntryEndOffset(700);
+ final long consistentPoint = this.epochCache2.findConsistentPoint(this.epochCache);
+ assertEquals(consistentPoint, 700);
+ }
+}
\ No newline at end of file
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
new file mode 100644
index 000000000..1f6360295
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AutoSwitchHATest {
+ private final String StoreMessage = "Once, there was a chance for me!";
+ private final int defaultMappedFileSize = 1024 * 1024;
+ private int QUEUE_TOTAL = 100;
+ private AtomicInteger QueueId = new AtomicInteger(0);
+ private SocketAddress BornHost;
+ private SocketAddress StoreHost;
+ private byte[] MessageBody;
+
+ private DefaultMessageStore messageStore1;
+ private DefaultMessageStore messageStore2;
+ private DefaultMessageStore messageStore3;
+ private MessageStoreConfig storeConfig1;
+ private MessageStoreConfig storeConfig2;
+ private MessageStoreConfig storeConfig3;
+ private String store1HaAddress;
+ private String store2HaAddress;
+ private String store3HaAddress;
+
+ private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest", true);
+ private String storePathRootParentDir = System.getProperty("user.home") + File.separator +
+ UUID.randomUUID().toString().replace("-", "");
+ private String storePathRootDir = storePathRootParentDir + File.separator + "store";
+
+ public void init(int mappedFileSize) throws Exception {
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ storeConfig1 = new MessageStoreConfig();
+ storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig1.setStorePathRootDir(storePathRootDir + File.separator + "broker1");
+ storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + "broker1" + File.separator + "commitlog");
+ storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + "broker1" + File.separator + "EpochFileCache");
+ storeConfig1.setTotalReplicas(3);
+ storeConfig1.setInSyncReplicas(2);
+ storeConfig1.setStartupControllerMode(true);
+ buildMessageStoreConfig(storeConfig1, mappedFileSize);
+ this.store1HaAddress = "127.0.0.1:10912";
+
+ storeConfig2 = new MessageStoreConfig();
+ storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + "broker2");
+ storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + "broker2" + File.separator + "commitlog");
+ storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + "broker2" + File.separator + "EpochFileCache");
+ storeConfig2.setHaListenPort(10943);
+ storeConfig2.setTotalReplicas(3);
+ storeConfig2.setInSyncReplicas(2);
+ storeConfig2.setStartupControllerMode(true);
+ buildMessageStoreConfig(storeConfig2, mappedFileSize);
+ this.store2HaAddress = "127.0.0.1:10943";
+
+ messageStore1 = buildMessageStore(storeConfig1, 0L);
+ messageStore2 = buildMessageStore(storeConfig2, 1L);
+
+ storeConfig3 = new MessageStoreConfig();
+ storeConfig3.setBrokerRole(BrokerRole.SLAVE);
+ storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + "broker3");
+ storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + "broker3" + File.separator + "commitlog");
+ storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + "broker3" + File.separator + "EpochFileCache");
+ storeConfig3.setHaListenPort(10980);
+ storeConfig3.setTotalReplicas(3);
+ storeConfig3.setInSyncReplicas(2);
+ storeConfig3.setStartupControllerMode(true);
+ buildMessageStoreConfig(storeConfig3, mappedFileSize);
+ messageStore3 = buildMessageStore(storeConfig3, 3L);
+ this.store3HaAddress = "127.0.0.1:10980";
+
+ assertTrue(messageStore1.load());
+ assertTrue(messageStore2.load());
+ assertTrue(messageStore3.load());
+ messageStore1.start();
+ messageStore2.start();
+ messageStore3.start();
+ }
+
+ private void changeMasterAndPutMessage(DefaultMessageStore master, MessageStoreConfig masterConfig,
+ DefaultMessageStore slave, MessageStoreConfig slaveConfig, int epoch, String masterHaAddress,
+ int totalPutMessageNums) throws Exception {
+
+ // Change role
+ slaveConfig.setBrokerRole(BrokerRole.SLAVE);
+ masterConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+ slave.getHaService().changeToSlave("", masterHaAddress, epoch);
+ master.getHaService().changeToMaster(epoch);
+ Thread.sleep(6000);
+
+ // Put message on master
+ for (int i = 0; i < totalPutMessageNums; i++) {
+ master.putMessage(buildMessage());
+ }
+ Thread.sleep(200);
+ }
+
+ private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
+ for (long i = 0; i < totalMsgs; i++) {
+ GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset + i, 1024 * 1024, null);
+ assertThat(result).isNotNull();
+ if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
+ System.out.println("Failed i :" + i);
+ }
+ assertTrue(GetMessageStatus.FOUND.equals(result.getStatus()));
+ result.release();
+ }
+ }
+
+ @Test
+ public void testChangeRoleManyTimes() throws Exception {
+ // Step1, change store1 to master, store2 to follower
+ init(defaultMappedFileSize);
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+
+ // Step2, change store1 to follower, store2 to master, epoch = 2
+ changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore1, this.storeConfig1, 2, store2HaAddress, 10);
+ checkMessage(this.messageStore1, 20, 0);
+
+ // Step3, change store2 to follower, store1 to master, epoch = 3
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 3, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 30, 0);
+ }
+
+ @Test
+ public void testAddBroker() throws Exception {
+ // Step1: broker1 as leader, broker2 as follower
+ init(defaultMappedFileSize);
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+
+ // Step2: add new broker3, link to broker1
+ messageStore3.getHaService().changeToSlave("", "127.0.0.1:10912", 1);
+ Thread.sleep(6000);
+ checkMessage(messageStore3, 10, 0);
+ }
+
+ @Test
+ public void testTruncateEpochLogAndAddBroker() throws Exception {
+ // Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
+ init(1700);
+
+ // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+ // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 20, 0);
+
+ // Step2: Check file position, each epoch will be stored on one file(Because fileSize = 1570, which equal to 10 msg size);
+ // So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
+ final MappedFileQueue fileQueue = this.messageStore1.getCommitLog().getMappedFileQueue();
+ assertEquals(2, fileQueue.getTotalFileSize() / 1700);
+
+ // Step3: truncate epoch1's log (truncateEndOffset = 1570), which means we should delete the first file directly.
+ final MappedFile firstFile = this.messageStore1.getCommitLog().getMappedFileQueue().getFirstMappedFile();
+ firstFile.shutdown(1000);
+ fileQueue.retryDeleteFirstFile(1000);
+ assertEquals(this.messageStore1.getCommitLog().getMinOffset(), 1700);
+ checkMessage(this.messageStore1, 10, 10);
+
+ final AutoSwitchHAService haService = (AutoSwitchHAService) this.messageStore1.getHaService();
+ haService.truncateEpochFilePrefix(1570);
+
+ // Step4: add broker3 as slave, only have 10 msg from offset 10;
+ messageStore3.getHaService().changeToSlave("", store1HaAddress, 2);
+ Thread.sleep(6000);
+
+ checkMessage(messageStore3, 10, 10);
+ }
+
+ @Test
+ public void testTruncateEpochLogAndChangeMaster() throws Exception {
+ // Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
+ init(1700);
+
+ // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+ // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 20, 0);
+
+ // Step2: Check file position, each epoch will be stored on one file(Because fileSize = 1570, which equal to 10 msg size);
+ // So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
+ final MappedFileQueue fileQueue = this.messageStore1.getCommitLog().getMappedFileQueue();
+ assertEquals(2, fileQueue.getTotalFileSize() / 1700);
+
+ // Step3: truncate epoch1's log (truncateEndOffset = 1570), which means we should delete the first file directly.
+ final MappedFile firstFile = this.messageStore1.getCommitLog().getMappedFileQueue().getFirstMappedFile();
+ firstFile.shutdown(1000);
+ fileQueue.retryDeleteFirstFile(1000);
+ assertEquals(this.messageStore1.getCommitLog().getMinOffset(), 1700);
+
+ final AutoSwitchHAService haService = (AutoSwitchHAService) this.messageStore1.getHaService();
+ haService.truncateEpochFilePrefix(1570);
+ checkMessage(this.messageStore1, 10, 10);
+
+ // Step4: add broker3 as slave
+ messageStore3.getHaService().changeToSlave("", store1HaAddress, 2);
+ Thread.sleep(6000);
+ checkMessage(messageStore3, 10, 10);
+
+ // Step5: change broker2 as leader, broker3 as follower
+ changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore3, this.storeConfig3, 3, this.store2HaAddress, 10);
+ checkMessage(messageStore3, 20, 10);
+
+ // Step6, let broker1 link to broker2, it should sync log from epoch3.
+ this.storeConfig1.setBrokerRole(BrokerRole.SLAVE);
+ this.messageStore1.getHaService().changeToSlave("", this.store2HaAddress, 3);
+ Thread.sleep(6000);
+ checkMessage(messageStore1, 20, 0);
+ }
+
+
+ @Test
+ public void testAddBrokerAndSyncFromLastFile() throws Exception {
+ init(1700);
+
+ // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+ // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 10, 0);
+ changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+ checkMessage(this.messageStore2, 20, 0);
+
+
+ // Step2: restart broker3
+ messageStore3.shutdown();
+ messageStore3.destroy();
+
+ storeConfig3.setSyncFromLastFile(true);
+ messageStore3 = buildMessageStore(storeConfig3, 3L);
+ assertTrue(messageStore3.load());
+ messageStore3.start();
+
+ // Step2: add new broker3, link to broker1. because broker3 request sync from lastFile, so it only synced 10 msg from offset 10;
+ messageStore3.getHaService().changeToSlave("", "127.0.0.1:10912", 2);
+ Thread.sleep(6000);
+ checkMessage(messageStore3, 10, 10);
+ }
+
+
+ @After
+ public void destroy() throws Exception {
+ Thread.sleep(5000L);
+ messageStore2.shutdown();
+ messageStore2.destroy();
+ messageStore1.shutdown();
+ messageStore1.destroy();
+ messageStore3.shutdown();
+ messageStore3.destroy();
+ File file = new File(storePathRootParentDir);
+ UtilAll.deleteFile(file);
+ }
+
+ private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreConfig,
+ long brokerId) throws Exception {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setBrokerId(brokerId);
+ return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig);
+ }
+
+ private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) {
+ messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
+ messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
+ messageStoreConfig.setMaxHashSlotNum(10000);
+ messageStoreConfig.setMaxIndexNum(100 * 100);
+ messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+ messageStoreConfig.setFlushIntervalConsumeQueue(1);
+ }
+
+ private MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic("FooBar");
+ msg.setTags("TAG1");
+ msg.setBody(MessageBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ msg.setSysFlag(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ return msg;
+ }
+}