You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/24 04:29:29 UTC

[iotdb] branch native_raft updated: fix log serialization

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 37228be3f1 fix log serialization
37228be3f1 is described below

commit 37228be3f1b05641d6736ff3d84d1b64f6a0821c
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Apr 24 12:31:55 2023 +0800

    fix log serialization
---
 .../consensus/natraft/protocol/RaftMember.java     |   2 +-
 .../natraft/protocol/log/catchup/CatchUpTask.java  |   2 +-
 .../protocol/log/manager/RaftLogManager.java       |   3 +-
 .../manager/serialization/StableEntryManager.java  |   3 +-
 .../serialization/SyncLogDequeSerializer.java      | 172 ++++++++++++---------
 .../apache/iotdb/consensus/raft/ReplicateTest.java |  68 --------
 .../consensus/raft/util/TestStateMachine.java      |   3 +
 thrift-raft/pom.xml                                |   2 +-
 8 files changed, 108 insertions(+), 147 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index a7c00d3bc2..cbac7ac20a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -987,7 +987,7 @@ public class RaftMember {
       logger.debug("{}: plan {} has no where to be forwarded", name, plan);
       return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
     }
-    logger.info("{}: Forward {} to node {}", name, plan, node);
+    logger.debug("{}: Forward {} to node {}", name, plan, node);
 
     TSStatus status;
     status = forwardPlanAsync(plan, node, groupId);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index 15ecbca8b9..3a177e3f9e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -196,7 +196,7 @@ public class CatchUpTask implements Runnable {
 
   private List<Entry> getLogsInStableEntryManager(long startIndex, long endIndex) {
     List<Entry> logsInDisk =
-        raftMember.getLogManager().getStableEntryManager().getEntries(startIndex, endIndex);
+        raftMember.getLogManager().getStableEntryManager().getEntries(startIndex, endIndex, true);
     logger.debug(
         "{}, found {} logs in disk to catchup {}, startIndex={}, endIndex={}",
         raftMember.getName(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index ebb0a0f37a..d73613c7a3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -298,7 +298,7 @@ public abstract class RaftLogManager {
       if (index < firstIndex) {
         // search in disk
         if (config.isEnableRaftLogPersistence()) {
-          List<Entry> logsInDisk = getStableEntryManager().getEntries(index, index);
+          List<Entry> logsInDisk = getStableEntryManager().getEntries(index, index, false);
           if (logsInDisk.isEmpty()) {
             return -1;
           } else {
@@ -764,6 +764,7 @@ public abstract class RaftLogManager {
   }
 
   public void close() {
+    getStableEntryManager().updateMeta(commitIndex, appliedIndex);
     getStableEntryManager().close();
     if (deleteLogExecutorService != null) {
       deleteLogExecutorService.shutdownNow();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index b1768e5a24..92e7603036 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -44,6 +44,7 @@ public interface StableEntryManager {
 
   HardState getHardState();
 
+  void updateMeta(long commitIndex, long applyIndex);
   LogManagerMeta getMeta();
 
   /**
@@ -51,7 +52,7 @@ public interface StableEntryManager {
    * @param endIndex (inclusive) the log end index
    * @return the raft log which index between [startIndex, endIndex] or empty if not found
    */
-  List<Entry> getEntries(long startIndex, long endIndex);
+  List<Entry> getEntries(long startIndex, long endIndex, boolean limitBatch);
 
   void close();
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 24b6755771..90ef700867 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -71,10 +71,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private static final String LOG_DATA_FILE_SUFFIX = "data";
   private static final String LOG_INDEX_FILE_SUFFIX = "idx";
 
-  /** the log data files */
+  /**
+   * the log data files
+   */
   private List<File> logDataFileList;
 
-  /** the log index files */
+  /**
+   * the log index files
+   */
   private List<IndexFileDescriptor> logIndexFileList;
 
   private LogParser parser = LogParser.getINSTANCE();
@@ -84,10 +88,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private LogManagerMeta meta;
   private HardState state;
 
-  /** min version of available log */
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
 
-  /** max version of available log */
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
 
   private String logDir;
@@ -142,7 +150,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
 
-  /** the lock uses when change the log data files or log index files */
+  /**
+   * the lock uses when change the log data files or log index files
+   */
   private final Lock lock = new ReentrantLock();
 
   private volatile boolean isClosed = false;
@@ -201,23 +211,29 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
   }
 
-  /** for log tools */
+  /**
+   * for log tools
+   */
   @Override
   public LogManagerMeta getMeta() {
     return meta;
   }
 
-  /** Recover all the logs in disk. This function will be called once this instance is created. */
+  @Override
+  public void updateMeta(long commitIndex, long applyIndex) {
+    meta.setCommitLogIndex(commitIndex);
+    meta.setLastAppliedIndex(applyIndex);
+  }
+
+  /**
+   * Recover all the logs in disk. This function will be called once this instance is created.
+   */
   @Override
   public List<Entry> getAllEntriesAfterAppliedIndex() {
     logger.debug(
-        "getAllEntriesBeforeAppliedIndex, maxHaveAppliedCommitIndex={}, commitLogIndex={}",
-        meta.getLastAppliedIndex(),
-        meta.getCommitLogIndex());
-    if (meta.getLastAppliedIndex() >= meta.getCommitLogIndex()) {
-      return Collections.emptyList();
-    }
-    return getEntries(meta.getLastAppliedIndex(), meta.getCommitLogIndex());
+        "getAllEntriesBeforeAppliedIndex, maxHaveAppliedCommitIndex={}",
+        meta.getLastAppliedIndex());
+    return getEntries(meta.getLastAppliedIndex(), Long.MAX_VALUE, false);
   }
 
   /**
@@ -225,7 +241,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    * is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
    * already persistent logs.
    *
-   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
+   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
+   * been
    * flushed to 7,when we restart cluster,we need to recover 6 and 7.
    *
    * <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
@@ -241,7 +258,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     if (meta.getCommitLogIndex() >= lastIndex) {
       return Collections.emptyList();
     }
-    return getEntries(meta.getCommitLogIndex() + 1, lastIndex);
+    return getEntries(meta.getCommitLogIndex() + 1, lastIndex, false);
   }
 
   @Override
@@ -379,8 +396,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         byte[] compressed =
             compressor.compress(
                 logDataBuffer.array(),
-                logDataBuffer.arrayOffset() + logDataBuffer.position(),
-                logDataBuffer.remaining());
+                0,
+                logDataBuffer.position());
         ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
         currentLogDataOutputStream.write(compressed);
         logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
@@ -428,7 +445,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** flush the log buffer and check if the file needs to be closed */
+  /**
+   * flush the log buffer and check if the file needs to be closed
+   */
   @Override
   public void forceFlushLogBuffer() {
     lock.lock();
@@ -472,7 +491,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
+  /**
+   * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
+   */
   private void recoverLogFiles() {
     // 1. first we should recover the log index file
     recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -519,9 +540,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * Check that the file is legal or not
    *
-   * @param file file needs to be check
-   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
-   *     SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+   * @param file     file needs to be check
+   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
+   *                 {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
    * @return true if the file legal otherwise false
    */
   private boolean checkLogFile(File file, String fileType) {
@@ -749,7 +770,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
+  /**
+   * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
+   */
   private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
     lock.lock();
     try {
@@ -867,23 +890,25 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     lock.lock();
     forceFlushLogBuffer();
     try {
-      closeCurrentFile(meta.getCommitLogIndex());
-      if (persistLogDeleteExecutorService != null) {
-        persistLogDeleteExecutorService.shutdownNow();
-        persistLogDeleteLogFuture.cancel(true);
-        persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
-        persistLogDeleteExecutorService = null;
-      }
+      closeCurrentFile(lastLogIndex);
     } catch (IOException e) {
       logger.error("Error in log serialization: ", e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      logger.warn("Close persist log delete thread interrupted");
     } finally {
       logger.info("{} is closed", this);
       isClosed = true;
       lock.unlock();
     }
+
+    if (persistLogDeleteExecutorService != null) {
+      persistLogDeleteExecutorService.shutdownNow();
+      persistLogDeleteLogFuture.cancel(true);
+      try {
+        persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      persistLogDeleteExecutorService = null;
+    }
   }
 
   @Override
@@ -1092,11 +1117,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex the log end index
+   * @param endIndex   the log end index
    * @return the raft log which index between [startIndex, endIndex] or empty if not found
    */
   @Override
-  public List<Entry> getEntries(long startIndex, long endIndex) {
+  public List<Entry> getEntries(long startIndex, long endIndex, boolean limitBatch) {
     if (startIndex > endIndex) {
       logger.error(
           "startIndex={} should be less than or equal to endIndex={}", startIndex, endIndex);
@@ -1107,7 +1132,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
 
     long newEndIndex;
-    if (endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
+    if (limitBatch && endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
       newEndIndex = startIndex + maxNumberOfLogsPerFetchOnDisk;
     } else {
       newEndIndex = endIndex;
@@ -1152,15 +1177,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   public long getOffsetAccordingToLogIndex(long logIndex) {
     long offset = -1;
 
-    long maxLogIndex = firstLogIndex + logIndexOffsetList.size();
-    if (logIndex >= maxLogIndex) {
-      logger.error(
-          "given log index={} exceed the max log index={}, firstLogIndex={}",
-          logIndex,
-          maxLogIndex,
-          firstLogIndex);
-      return -1;
-    }
     // 1. first find in memory
     if (logIndex >= firstLogIndex) {
       for (Pair<Long, Long> indexOffset : logIndexOffsetList) {
@@ -1206,9 +1222,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex the log end index
+   * @param endIndex   the log end index
    * @return first value-> the log data file, second value-> the left value is the start offset of
-   *     the file, the right is the end offset of the file
+   * the file, the right is the end offset of the file
    */
   private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
       long startIndex, long endIndex) {
@@ -1234,7 +1250,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       long endOffset = getOffsetAccordingToLogIndex(endIndexInOneFile);
       fileNameWithStartAndEndOffset.add(
           new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
-
       logger.debug(
           "get log data offset=[{},{}] according to log index=[{},{}], file={}",
           startOffset,
@@ -1242,38 +1257,41 @@ public class SyncLogDequeSerializer implements StableEntryManager {
           startIndexInOneFile,
           endIndexInOneFile,
           logDataFileWithStartAndEndLogIndex.left);
+      logDataFileWithStartAndEndLogIndex = null;
+
       // 4. search the next file to get the log index of fileEndLogIndex + 1
       startIndexInOneFile = endIndexInOneFile + 1;
       startOffset = getOffsetAccordingToLogIndex(startIndexInOneFile);
       if (startOffset == -1) {
-        return Collections.emptyList();
+        break;
       }
       logDataFileWithStartAndEndLogIndex = getLogDataFile(startIndexInOneFile);
-      if (logDataFileWithStartAndEndLogIndex == null) {
-        return Collections.emptyList();
-      }
       endIndexInOneFile = logDataFileWithStartAndEndLogIndex.right.right;
     }
-    // this means the endIndex's offset can not be found in the file
-    // logDataFileWithStartAndEndLogIndex.left
-    long endOffset = getOffsetAccordingToLogIndex(endIndex);
-    fileNameWithStartAndEndOffset.add(
-        new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
-    logger.debug(
-        "get log data offset=[{},{}] according to log index=[{},{}], file={}",
-        startOffset,
-        endOffset,
-        startIndexInOneFile,
-        endIndex,
-        logDataFileWithStartAndEndLogIndex.left);
+
+    if (logDataFileWithStartAndEndLogIndex != null) {
+      // this means the endIndex's offset can not be found in the file
+      // logDataFileWithStartAndEndLogIndex.left
+      long endOffset = getOffsetAccordingToLogIndex(
+          Math.min(endIndex, logDataFileWithStartAndEndLogIndex.right.right));
+      fileNameWithStartAndEndOffset.add(
+          new Pair<>(logDataFileWithStartAndEndLogIndex.left, new Pair<>(startOffset, endOffset)));
+      logger.debug(
+          "get log data offset=[{},{}] according to log index=[{},{}], file={}",
+          startOffset,
+          endOffset,
+          startIndexInOneFile,
+          endIndex,
+          logDataFileWithStartAndEndLogIndex.left);
+    }
     return fileNameWithStartAndEndOffset;
   }
 
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log index file which contains the start index; the
-   *     second pair's first value is the file's start log index. the second pair's second value is
-   *     the file's end log index. null if not found
+   * second pair's first value is the file's start log index. the second pair's second value is the
+   * file's end log index. null if not found
    */
   public IndexFileDescriptor getLogIndexFile(long startIndex) {
     for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1288,8 +1306,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log data file which contains the start index; the
-   *     second pair's first value is the file's start log index. the second pair's second value is
-   *     the file's end log index. null if not found
+   * second pair's first value is the file's start log index. the second pair's second value is the
+   * file's end log index. null if not found
    */
   public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
     for (File file : logDataFileList) {
@@ -1308,9 +1326,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   /**
-   * @param file the log data file
+   * @param file              the log data file
    * @param startAndEndOffset the left value is the start offset of the file, the right is the end
-   *     offset of the file
+   *                          offset of the file
    * @return the logs between start offset and end offset
    */
   private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1403,7 +1421,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
        * automatically increased by saveInterval to avoid conflicts.
        */
       private static long saveInterval = 100;
-      /** time partition id to dividing time series into different storage group */
+      /**
+       * time partition id to dividing time series into different storage group
+       */
       private long timePartitionId;
 
       private long prevVersion;
@@ -1417,7 +1437,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         restore();
       }
 
-      /** only used for upgrading */
+      /**
+       * only used for upgrading
+       */
       public SimpleFileVersionController(String directoryPath) throws IOException {
         this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
         restore();
@@ -1480,7 +1502,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         prevVersion = currVersion;
       }
 
-      /** recovery from disk */
+      /**
+       * recovery from disk
+       */
       @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
       private void restore() throws IOException {
         File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
index 777dec7866..57db4e731b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
@@ -184,72 +184,4 @@ public class ReplicateTest {
     Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getAppliedIndex());
     Assert.assertEquals(CHECK_POINT - 1, servers.get(2).getMember(gid).getAppliedIndex());
   }
-
-  /**
-   * First, suspend one node to test that the request replication between the two alive nodes is ok,
-   * then restart all nodes to lose state in the queue, and test using WAL replication to make all
-   * nodes finally consistent
-   */
-  @Test
-  public void Replicate2NodeTest() throws IOException, InterruptedException {
-    logger.info("Start ReplicateUsingWALTest");
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-
-    Assert.assertEquals(-1, servers.get(0).getMember(gid).getLastIndex());
-    Assert.assertEquals(-1, servers.get(1).getMember(gid).getLastIndex());
-
-    for (int i = 0; i < CHECK_POINT; i++) {
-      servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
-    }
-
-    for (int i = 0; i < 2; i++) {
-      long start = System.currentTimeMillis();
-      // should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
-      // replicating all entries
-      while (servers.get(i).getMember(gid).getAppliedIndex() < CHECK_POINT - 1) {
-        long current = System.currentTimeMillis();
-        if ((current - start) > 60 * 1000) {
-          logger.error("{}", servers.get(i).getMember(gid).getAppliedIndex());
-          Assert.fail("Unable to replicate entries");
-        }
-        Thread.sleep(100);
-      }
-    }
-
-    Assert.assertEquals(CHECK_POINT - 1, servers.get(0).getMember(gid).getAppliedIndex());
-    Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getAppliedIndex());
-
-    stopServer();
-    initServer();
-
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
-
-    Assert.assertEquals(peers, servers.get(0).getMember(gid).getConfiguration());
-    Assert.assertEquals(peers, servers.get(1).getMember(gid).getConfiguration());
-    Assert.assertEquals(peers, servers.get(2).getMember(gid).getConfiguration());
-
-    Assert.assertEquals(CHECK_POINT - 1, servers.get(0).getMember(gid).getLastIndex());
-    Assert.assertEquals(CHECK_POINT - 1, servers.get(1).getMember(gid).getLastIndex());
-    Assert.assertEquals(-1, servers.get(2).getMember(gid).getLastIndex());
-
-    for (int i = 0; i < 3; i++) {
-      long start = System.currentTimeMillis();
-      while (servers.get(i).getMember(gid).getAppliedIndex() < CHECK_POINT - 1) {
-        long current = System.currentTimeMillis();
-        if ((current - start) > 60 * 1000) {
-          logger.error("{}", servers.get(i).getMember(gid).getAppliedIndex());
-          Assert.fail("Unable to replicate entries");
-        }
-        Thread.sleep(100);
-      }
-    }
-
-    Assert.assertEquals(CHECK_POINT, stateMachines.get(0).getRequestSet().size());
-    Assert.assertEquals(CHECK_POINT, stateMachines.get(1).getRequestSet().size());
-    Assert.assertEquals(CHECK_POINT, stateMachines.get(2).getRequestSet().size());
-
-    Assert.assertEquals(stateMachines.get(0).read(null), stateMachines.get(1).read(null));
-    Assert.assertEquals(stateMachines.get(2).read(null), stateMachines.get(1).read(null));
-  }
 }
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
index 69ae1bd783..b22a465c69 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
@@ -66,6 +66,9 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   @Override
   public IConsensusRequest deserializeRequest(IConsensusRequest request) {
+    if (request instanceof TestEntry) {
+      return request;
+    }
     ByteBufferConsensusRequest byteBufferConsensusRequest = (ByteBufferConsensusRequest) request;
     ByteBuffer byteBuffer = byteBufferConsensusRequest.serializeToByteBuffer();
     byteBuffer.rewind();
diff --git a/thrift-raft/pom.xml b/thrift-raft/pom.xml
index 45994aeaa9..10548f21cd 100644
--- a/thrift-raft/pom.xml
+++ b/thrift-raft/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.iotdb</groupId>
         <artifactId>iotdb-parent</artifactId>
-        <version>1.1.0-SNAPSHOT</version>
+        <version>1.2.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>iotdb-thrift-raft-consensus</artifactId>