You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/24 04:29:29 UTC
[iotdb] branch native_raft updated: fix log serialization
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 37228be3f1 fix log serialization
37228be3f1 is described below
commit 37228be3f1b05641d6736ff3d84d1b64f6a0821c
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Apr 24 12:31:55 2023 +0800
fix log serialization
---
.../consensus/natraft/protocol/RaftMember.java | 2 +-
.../natraft/protocol/log/catchup/CatchUpTask.java | 2 +-
.../protocol/log/manager/RaftLogManager.java | 3 +-
.../manager/serialization/StableEntryManager.java | 3 +-
.../serialization/SyncLogDequeSerializer.java | 172 ++++++++++++---------
.../apache/iotdb/consensus/raft/ReplicateTest.java | 68 --------
.../consensus/raft/util/TestStateMachine.java | 3 +
thrift-raft/pom.xml | 2 +-
8 files changed, 108 insertions(+), 147 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index a7c00d3bc2..cbac7ac20a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -987,7 +987,7 @@ public class RaftMember {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
}
- logger.info("{}: Forward {} to node {}", name, plan, node);
+ logger.debug("{}: Forward {} to node {}", name, plan, node);
TSStatus status;
status = forwardPlanAsync(plan, node, groupId);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index 15ecbca8b9..3a177e3f9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -196,7 +196,7 @@ public class CatchUpTask implements Runnable {
private List<Entry> getLogsInStableEntryManager(long startIndex, long endIndex) {
List<Entry> logsInDisk =
- raftMember.getLogManager().getStableEntryManager().getEntries(startIndex, endIndex);
+ raftMember.getLogManager().getStableEntryManager().getEntries(startIndex, endIndex, true);
logger.debug(
"{}, found {} logs in disk to catchup {}, startIndex={}, endIndex={}",
raftMember.getName(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index ebb0a0f37a..d73613c7a3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -298,7 +298,7 @@ public abstract class RaftLogManager {
if (index < firstIndex) {
// search in disk
if (config.isEnableRaftLogPersistence()) {
- List<Entry> logsInDisk = getStableEntryManager().getEntries(index, index);
+ List<Entry> logsInDisk = getStableEntryManager().getEntries(index, index, false);
if (logsInDisk.isEmpty()) {
return -1;
} else {
@@ -764,6 +764,7 @@ public abstract class RaftLogManager {
}
public void close() {
+ getStableEntryManager().updateMeta(commitIndex, appliedIndex);
getStableEntryManager().close();
if (deleteLogExecutorService != null) {
deleteLogExecutorService.shutdownNow();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index b1768e5a24..92e7603036 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -44,6 +44,7 @@ public interface StableEntryManager {
HardState getHardState();
+ void updateMeta(long commitIndex, long applyIndex);
LogManagerMeta getMeta();
/**
@@ -51,7 +52,7 @@ public interface StableEntryManager {
* @param endIndex (inclusive) the log end index
* @return the raft log which index between [startIndex, endIndex] or empty if not found
*/
- List<Entry> getEntries(long startIndex, long endIndex);
+ List<Entry> getEntries(long startIndex, long endIndex, boolean limitBatch);
void close();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 24b6755771..90ef700867 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -71,10 +71,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final String LOG_DATA_FILE_SUFFIX = "data";
private static final String LOG_INDEX_FILE_SUFFIX = "idx";
- /** the log data files */
+ /**
+ * the log data files
+ */
private List<File> logDataFileList;
- /** the log index files */
+ /**
+ * the log index files
+ */
private List<IndexFileDescriptor> logIndexFileList;
private LogParser parser = LogParser.getINSTANCE();
@@ -84,10 +88,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private LogManagerMeta meta;
private HardState state;
- /** min version of available log */
+ /**
+ * min version of available log
+ */
private long minAvailableVersion = 0;
- /** max version of available log */
+ /**
+ * max version of available log
+ */
private long maxAvailableVersion = Long.MAX_VALUE;
private String logDir;
@@ -142,7 +150,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
- /** the lock uses when change the log data files or log index files */
+ /**
+ * the lock uses when change the log data files or log index files
+ */
private final Lock lock = new ReentrantLock();
private volatile boolean isClosed = false;
@@ -201,23 +211,29 @@ public class SyncLogDequeSerializer implements StableEntryManager {
return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
}
- /** for log tools */
+ /**
+ * for log tools
+ */
@Override
public LogManagerMeta getMeta() {
return meta;
}
- /** Recover all the logs in disk. This function will be called once this instance is created. */
+ @Override
+ public void updateMeta(long commitIndex, long applyIndex) {
+ meta.setCommitLogIndex(commitIndex);
+ meta.setLastAppliedIndex(applyIndex);
+ }
+
+ /**
+ * Recover all the logs in disk. This function will be called once this instance is created.
+ */
@Override
public List<Entry> getAllEntriesAfterAppliedIndex() {
logger.debug(
- "getAllEntriesBeforeAppliedIndex, maxHaveAppliedCommitIndex={}, commitLogIndex={}",
- meta.getLastAppliedIndex(),
- meta.getCommitLogIndex());
- if (meta.getLastAppliedIndex() >= meta.getCommitLogIndex()) {
- return Collections.emptyList();
- }
- return getEntries(meta.getLastAppliedIndex(), meta.getCommitLogIndex());
+ "getAllEntriesBeforeAppliedIndex, maxHaveAppliedCommitIndex={}",
+ meta.getLastAppliedIndex());
+ return getEntries(meta.getLastAppliedIndex(), Long.MAX_VALUE, false);
}
/**
@@ -225,7 +241,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
* already persistent logs.
*
- * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
+ * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
+ * been
* flushed to 7,when we restart cluster,we need to recover 6 and 7.
*
* <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
@@ -241,7 +258,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
if (meta.getCommitLogIndex() >= lastIndex) {
return Collections.emptyList();
}
- return getEntries(meta.getCommitLogIndex() + 1, lastIndex);
+ return getEntries(meta.getCommitLogIndex() + 1, lastIndex, false);
}
@Override
@@ -379,8 +396,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
byte[] compressed =
compressor.compress(
logDataBuffer.array(),
- logDataBuffer.arrayOffset() + logDataBuffer.position(),
- logDataBuffer.remaining());
+ 0,
+ logDataBuffer.position());
ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
currentLogDataOutputStream.write(compressed);
logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
@@ -428,7 +445,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** flush the log buffer and check if the file needs to be closed */
+ /**
+ * flush the log buffer and check if the file needs to be closed
+ */
@Override
public void forceFlushLogBuffer() {
lock.lock();
@@ -472,7 +491,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
+ /**
+ * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
+ */
private void recoverLogFiles() {
// 1. first we should recover the log index file
recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -519,9 +540,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* Check that the file is legal or not
*
- * @param file file needs to be check
- * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
- * SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+ * @param file file needs to be check
+ * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
+ * {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
* @return true if the file legal otherwise false
*/
private boolean checkLogFile(File file, String fileType) {
@@ -749,7 +770,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
}
- /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
+ /**
+ * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
+ */
private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
lock.lock();
try {
@@ -867,23 +890,25 @@ public class SyncLogDequeSerializer implements StableEntryManager {
lock.lock();
forceFlushLogBuffer();
try {
- closeCurrentFile(meta.getCommitLogIndex());
- if (persistLogDeleteExecutorService != null) {
- persistLogDeleteExecutorService.shutdownNow();
- persistLogDeleteLogFuture.cancel(true);
- persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
- persistLogDeleteExecutorService = null;
- }
+ closeCurrentFile(lastLogIndex);
} catch (IOException e) {
logger.error("Error in log serialization: ", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Close persist log delete thread interrupted");
} finally {
logger.info("{} is closed", this);
isClosed = true;
lock.unlock();
}
+
+ if (persistLogDeleteExecutorService != null) {
+ persistLogDeleteExecutorService.shutdownNow();
+ persistLogDeleteLogFuture.cancel(true);
+ try {
+ persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ persistLogDeleteExecutorService = null;
+ }
}
@Override
@@ -1092,11 +1117,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return the raft log which index between [startIndex, endIndex] or empty if not found
*/
@Override
- public List<Entry> getEntries(long startIndex, long endIndex) {
+ public List<Entry> getEntries(long startIndex, long endIndex, boolean limitBatch) {
if (startIndex > endIndex) {
logger.error(
"startIndex={} should be less than or equal to endIndex={}", startIndex, endIndex);
@@ -1107,7 +1132,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
long newEndIndex;
- if (endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
+ if (limitBatch && endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
newEndIndex = startIndex + maxNumberOfLogsPerFetchOnDisk;
} else {
newEndIndex = endIndex;
@@ -1152,15 +1177,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
public long getOffsetAccordingToLogIndex(long logIndex) {
long offset = -1;
- long maxLogIndex = firstLogIndex + logIndexOffsetList.size();
- if (logIndex >= maxLogIndex) {
- logger.error(
- "given log index={} exceed the max log index={}, firstLogIndex={}",
- logIndex,
- maxLogIndex,
- firstLogIndex);
- return -1;
- }
// 1. first find in memory
if (logIndex >= firstLogIndex) {
for (Pair<Long, Long> indexOffset : logIndexOffsetList) {
@@ -1206,9 +1222,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the log start index
- * @param endIndex the log end index
+ * @param endIndex the log end index
* @return first value-> the log data file, second value-> the left value is the start offset of
- * the file, the right is the end offset of the file
+ * the file, the right is the end offset of the file
*/
private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
long startIndex, long endIndex) {
@@ -1234,7 +1250,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
long endOffset = getOffsetAccordingToLogIndex(endIndexInOneFile);
fileNameWithStartAndEndOffset.add(
new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
-
logger.debug(
"get log data offset=[{},{}] according to log index=[{},{}], file={}",
startOffset,
@@ -1242,38 +1257,41 @@ public class SyncLogDequeSerializer implements StableEntryManager {
startIndexInOneFile,
endIndexInOneFile,
logDataFileWithStartAndEndLogIndex.left);
+ logDataFileWithStartAndEndLogIndex = null;
+
// 4. search the next file to get the log index of fileEndLogIndex + 1
startIndexInOneFile = endIndexInOneFile + 1;
startOffset = getOffsetAccordingToLogIndex(startIndexInOneFile);
if (startOffset == -1) {
- return Collections.emptyList();
+ break;
}
logDataFileWithStartAndEndLogIndex = getLogDataFile(startIndexInOneFile);
- if (logDataFileWithStartAndEndLogIndex == null) {
- return Collections.emptyList();
- }
endIndexInOneFile = logDataFileWithStartAndEndLogIndex.right.right;
}
- // this means the endIndex's offset can not be found in the file
- // logDataFileWithStartAndEndLogIndex.left
- long endOffset = getOffsetAccordingToLogIndex(endIndex);
- fileNameWithStartAndEndOffset.add(
- new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
- logger.debug(
- "get log data offset=[{},{}] according to log index=[{},{}], file={}",
- startOffset,
- endOffset,
- startIndexInOneFile,
- endIndex,
- logDataFileWithStartAndEndLogIndex.left);
+
+ if (logDataFileWithStartAndEndLogIndex != null) {
+ // this means the endIndex's offset can not be found in the file
+ // logDataFileWithStartAndEndLogIndex.left
+ long endOffset = getOffsetAccordingToLogIndex(
+ Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
+ fileNameWithStartAndEndOffset.add(
+ new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
+ logger.debug(
+ "get log data offset=[{},{}] according to log index=[{},{}], file={}",
+ startOffset,
+ endOffset,
+ startIndexInOneFile,
+ endIndex,
+ logDataFileWithStartAndEndLogIndex.left);
+ }
return fileNameWithStartAndEndOffset;
}
/**
* @param startIndex the start log index
* @return the first value of the pair is the log index file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is
- * the file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is the
+ * file's end log index. null if not found
*/
public IndexFileDescriptor getLogIndexFile(long startIndex) {
for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1288,8 +1306,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
/**
* @param startIndex the start log index
* @return the first value of the pair is the log data file which contains the start index; the
- * second pair's first value is the file's start log index. the second pair's second value is
- * the file's end log index. null if not found
+ * second pair's first value is the file's start log index. the second pair's second value is the
+ * file's end log index. null if not found
*/
public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
for (File file : logDataFileList) {
@@ -1308,9 +1326,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
/**
- * @param file the log data file
+ * @param file the log data file
* @param startAndEndOffset the left value is the start offset of the file, the right is the end
- * offset of the file
+ * offset of the file
* @return the logs between start offset and end offset
*/
private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1403,7 +1421,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
* automatically increased by saveInterval to avoid conflicts.
*/
private static long saveInterval = 100;
- /** time partition id to dividing time series into different storage group */
+ /**
+ * time partition id to dividing time series into different storage group
+ */
private long timePartitionId;
private long prevVersion;
@@ -1417,7 +1437,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
restore();
}
- /** only used for upgrading */
+ /**
+ * only used for upgrading
+ */
public SimpleFileVersionController(String directoryPath) throws IOException {
this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
restore();
@@ -1480,7 +1502,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
prevVersion = currVersion;
}
- /** recovery from disk */
+ /**
+ * recovery from disk
+ */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void restore() throws IOException {
File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
index 777dec7866..57db4e731b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
@@ -184,72 +184,4 @@ public class ReplicateTest {
Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getAppliedIndex());
Assert.assertEquals(CHECK_POINT - 1, servers.get(2).getMember(gid).getAppliedIndex());
}
-
- /**
- * First, suspend one node to test that the request replication between the two alive nodes is ok,
- * then restart all nodes to lose state in the queue, and test using WAL replication to make all
- * nodes finally consistent
- */
- @Test
- public void Replicate2NodeTest() throws IOException, InterruptedException {
- logger.info("Start ReplicateUsingWALTest");
- servers.get(0).createPeer(group.getGroupId(), group.getPeers());
- servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-
- Assert.assertEquals(-1, servers.get(0).getMember(gid).getLastIndex());
- Assert.assertEquals(-1, servers.get(1).getMember(gid).getLastIndex());
-
- for (int i = 0; i < CHECK_POINT; i++) {
- servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
- }
-
- for (int i = 0; i < 2; i++) {
- long start = System.currentTimeMillis();
- // should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
- // replicating all entries
- while (servers.get(i).getMember(gid).getAppliedIndex() < CHECK_POINT - 1) {
- long current = System.currentTimeMillis();
- if ((current - start) > 60 * 1000) {
- logger.error("{}", servers.get(i).getMember(gid).getAppliedIndex());
- Assert.fail("Unable to replicate entries");
- }
- Thread.sleep(100);
- }
- }
-
- Assert.assertEquals(CHECK_POINT - 1, servers.get(0).getMember(gid).getAppliedIndex());
- Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getAppliedIndex());
-
- stopServer();
- initServer();
-
- servers.get(2).createPeer(group.getGroupId(), group.getPeers());
-
- Assert.assertEquals(peers, servers.get(0).getMember(gid).getConfiguration());
- Assert.assertEquals(peers, servers.get(1).getMember(gid).getConfiguration());
- Assert.assertEquals(peers, servers.get(2).getMember(gid).getConfiguration());
-
- Assert.assertEquals(CHECK_POINT - 1, servers.get(0).getMember(gid).getLastIndex());
- Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getLastIndex());
- Assert.assertEquals(-1, servers.get(2).getMember(gid).getLastIndex());
-
- for (int i = 0; i < 3; i++) {
- long start = System.currentTimeMillis();
- while (servers.get(i).getMember(gid).getAppliedIndex() < CHECK_POINT - 1) {
- long current = System.currentTimeMillis();
- if ((current - start) > 60 * 1000) {
- logger.error("{}", servers.get(i).getMember(gid).getAppliedIndex());
- Assert.fail("Unable to replicate entries");
- }
- Thread.sleep(100);
- }
- }
-
- Assert.assertEquals(CHECK_POINT, stateMachines.get(0).getRequestSet().size());
- Assert.assertEquals(CHECK_POINT, stateMachines.get(1).getRequestSet().size());
- Assert.assertEquals(CHECK_POINT, stateMachines.get(2).getRequestSet().size());
-
- Assert.assertEquals(stateMachines.get(0).read(null), stateMachines.get(1).read(null));
- Assert.assertEquals(stateMachines.get(2).read(null), stateMachines.get(1).read(null));
- }
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
index 69ae1bd783..b22a465c69 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
@@ -66,6 +66,9 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
@Override
public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+ if (request instanceof TestEntry) {
+ return request;
+ }
ByteBufferConsensusRequest byteBufferConsensusRequest = (ByteBufferConsensusRequest) request;
ByteBuffer byteBuffer = byteBufferConsensusRequest.serializeToByteBuffer();
byteBuffer.rewind();
diff --git a/thrift-raft/pom.xml b/thrift-raft/pom.xml
index 45994aeaa9..10548f21cd 100644
--- a/thrift-raft/pom.xml
+++ b/thrift-raft/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-parent</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>iotdb-thrift-raft-consensus</artifactId>