You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/04/10 04:34:03 UTC

[incubator-iotdb] 01/01: first commit

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_data_snapshot
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3bcc1ba369267fb26f6d47c687b092834c3e89a5
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Apr 10 12:33:51 2020 +0800

    first commit
---
 .../org/apache/iotdb/cluster/log/LogManager.java   |  7 ---
 .../manage/FilePartitionedSnapshotLogManager.java  | 71 +++++++++++-----------
 .../cluster/log/snapshot/PullSnapshotTask.java     |  2 +-
 .../cluster/server/member/DataGroupMember.java     | 31 +++++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  2 -
 .../cluster/server/member/DataGroupMemberTest.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 11 ++++
 .../db/engine/modification/ModificationFile.java   | 40 +++++++++++-
 .../engine/storagegroup/StorageGroupProcessor.java | 10 +++
 .../db/engine/storagegroup/TsFileResource.java     | 66 ++++++++++++++++++++
 .../version/SimpleFileVersionController.java       |  8 +++
 .../engine/version/SysTimeVersionController.java   |  7 +++
 .../iotdb/db/engine/version/VersionController.java |  8 +++
 .../iotdb/db/writelog/recover/LogReplayerTest.java |  5 ++
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |  5 ++
 .../writelog/recover/UnseqTsFileRecoverTest.java   |  5 ++
 16 files changed, 227 insertions(+), 53 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
index 3f1a4bd..90b20bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
@@ -80,11 +80,4 @@ public interface LogManager {
   void setLastLogId(long lastLogId);
 
   void setLastLogTerm(long lastLogTerm);
-
-  /**
-   * Wait until all remote snapshots are pulled locally.
-   */
-  default void waitRemoteSnapshots() {
-
-  };
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 293fe34..dfeaca1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -20,14 +20,13 @@
 package org.apache.iotdb.cluster.log.manage;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.RemoteSnapshot;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -52,20 +51,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   }
 
   @Override
-  public void waitRemoteSnapshots() {
-    synchronized (slotSnapshots) {
-      for (Entry<Integer, FileSnapshot> entry : slotSnapshots.entrySet()) {
-        if (entry.getValue() instanceof RemoteSnapshot) {
-          ((RemoteSnapshot) entry.getValue()).getRemoteSnapshot();
-        }
-      }
-    }
-  }
-
-  @Override
   public void takeSnapshot() throws IOException {
-    // make sure every remote snapshot is pulled before creating local snapshot
-    waitRemoteSnapshots();
 
     logger.info("Taking snapshots, flushing IoTDB");
     StorageEngine.getInstance().syncCloseAllProcessor();
@@ -90,31 +76,48 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   }
 
   private void collectTsFiles() throws IOException {
-    slotSnapshots.clear();
     // TODO-Cluster#349: the collection is re-collected each time to prevent inconsistency when
     //  some of them are removed during two snapshots. Incremental addition or removal may be
     //  used to optimize
 
-    Map<String, Map<Long, List<TsFileResource>>> allClosedStorageGroupTsFile = StorageEngine
-        .getInstance().getAllClosedStorageGroupTsFile();
-    for (Entry<String, Map<Long, List<TsFileResource>>> entry :
-        allClosedStorageGroupTsFile.entrySet()) {
-      String storageGroupName = entry.getKey();
-      Map<Long, List<TsFileResource>> storageGroupsFiles = entry.getValue();
-      for (Entry<Long, List<TsFileResource>> storageGroupFiles : storageGroupsFiles.entrySet()) {
-        Long partitionNum = storageGroupFiles.getKey();
-        int slotNum = PartitionUtils.calculateStorageGroupSlotByPartition(storageGroupName,
-            partitionNum, partitionTable.getTotalSlotNumbers());
-        FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
-            s -> new FileSnapshot());
-        if (snapshot.getTimeseriesSchemas().isEmpty()) {
-          snapshot.setTimeseriesSchemas(slotTimeseries.getOrDefault(slotNum,
-              Collections.emptySet()));
-        }
-        for (TsFileResource tsFileResource : storageGroupFiles.getValue()) {
-          snapshot.addFile(tsFileResource, header);
+    startCollect:
+    while (true) {
+      slotSnapshots.clear();
+      Map<String, Map<Long, List<TsFileResource>>> allClosedStorageGroupTsFile = StorageEngine
+          .getInstance().getAllClosedStorageGroupTsFile();
+      List<TsFileResource> createdHardlinks = new ArrayList<>();
+      // group the TsFiles by their slots
+      for (Entry<String, Map<Long, List<TsFileResource>>> entry :
+          allClosedStorageGroupTsFile.entrySet()) {
+        String storageGroupName = entry.getKey();
+        Map<Long, List<TsFileResource>> storageGroupsFiles = entry.getValue();
+        for (Entry<Long, List<TsFileResource>> storageGroupFiles : storageGroupsFiles.entrySet()) {
+          Long partitionNum = storageGroupFiles.getKey();
+          int slotNum = PartitionUtils.calculateStorageGroupSlotByPartition(storageGroupName,
+              partitionNum, partitionTable.getTotalSlotNumbers());
+          FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
+              s -> new FileSnapshot());
+          if (snapshot.getTimeseriesSchemas().isEmpty()) {
+            snapshot.setTimeseriesSchemas(slotTimeseries.getOrDefault(slotNum,
+                Collections.emptySet()));
+          }
+
+          for (TsFileResource tsFileResource : storageGroupFiles.getValue()) {
+            TsFileResource hardlink = tsFileResource.createHardlink();
+            if (hardlink == null) {
+              // some file is deleted during the collecting, clean created hardlinks and restart
+              // from the beginning
+              for (TsFileResource createdHardlink : createdHardlinks) {
+                createdHardlink.remove();
+              }
+              continue startCollect;
+            }
+            createdHardlinks.add(hardlink);
+            snapshot.addFile(hardlink, header);
+          }
         }
       }
+      break;
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index e8a5a08..2a077b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -96,7 +96,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
         }
         for (Entry<Integer, T> entry : result.entrySet()) {
           try {
-            newMember.applySnapshot(entry.getValue());
+            newMember.applySnapshot(entry.getValue(), entry.getKey());
           } catch (SnapshotApplicationException e) {
             logger.error("Apply snapshot failed, retry...", e);
             Thread.sleep(ClusterConstant.PULL_SNAPSHOT_RETRY_INTERVAL);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 32f98eb..8899252 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -369,7 +369,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
    * supported in the future.
    * @param snapshot
    */
-  public void applySnapshot(Snapshot snapshot) throws SnapshotApplicationException {
+  public void applySnapshot(Snapshot snapshot, int slot) throws SnapshotApplicationException {
     logger.debug("{}: applying snapshot {}", name, snapshot);
     if (snapshot instanceof FileSnapshot) {
       try {
@@ -389,7 +389,8 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
    * overlap with existing files.
    * @param snapshot
    */
-  private void applyFileSnapshot(FileSnapshot snapshot) throws PullFileException {
+  private void applyFileSnapshot(FileSnapshot snapshot)
+      throws PullFileException, SnapshotApplicationException {
     synchronized (logManager) {
       // load metadata in the snapshot
       for (MeasurementSchema schema : snapshot.getTimeseriesSchemas()) {
@@ -400,6 +401,19 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       // load data in the snapshot
       // TODO-Cluster: deal with the failure of pulling a file
       List<RemoteTsFileResource> remoteTsFileResources = snapshot.getDataFiles();
+      // set partition versions
+      for (RemoteTsFileResource remoteTsFileResource : remoteTsFileResources) {
+        String[] pathSegments = FilePathUtils.splitTsFilePath(remoteTsFileResource);
+        int segSize = pathSegments.length;
+        String storageGroupName = pathSegments[segSize - 3];
+        try {
+          StorageEngine.getInstance().setPartitionVersionToMax(storageGroupName,
+              remoteTsFileResource.getTimePartition(), remoteTsFileResource.getMaxVersion());
+        } catch (StorageEngineException | IOException e) {
+          throw new SnapshotApplicationException(e);
+        }
+      }
+      // pull file
       for (RemoteTsFileResource resource : remoteTsFileResources) {
         if (!isFileAlreadyPulled(resource)) {
           loadRemoteFile(resource);
@@ -438,7 +452,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Integer slot : slots) {
         Snapshot subSnapshot = snapshot.getSnapshot(slot);
         if (subSnapshot != null) {
-          applySnapshot(subSnapshot);
+          applySnapshot(subSnapshot, slot);
         }
       }
       logManager.setLastLogId(snapshot.getLastLogId());
@@ -524,12 +538,15 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     int segSize = pathSegments.length;
     // the new file is stored at:
     // remote/{nodeIdentifier}/{storageGroupName}/{partitionNum}/{fileName}
-    String tempFileName =
+    // the file in the snapshot is a hardlink, remove the hardlink suffix
+    String tempFileName = pathSegments[segSize - 1].substring(0,
+        pathSegments[segSize - 1].lastIndexOf('.'));
+    String tempFilePath =
         node.getNodeIdentifier() + File.separator + pathSegments[segSize - 3] +
-            File.separator + pathSegments[segSize - 2] + File.separator + pathSegments[segSize - 1];
-    File tempFile = new File(REMOTE_FILE_TEMP_DIR, tempFileName);
+            File.separator + pathSegments[segSize - 2] + File.separator + tempFileName;
+    File tempFile = new File(REMOTE_FILE_TEMP_DIR, tempFilePath);
     tempFile.getParentFile().mkdirs();
-    File tempModFile = new File(REMOTE_FILE_TEMP_DIR, tempFileName + ModificationFile.FILE_SUFFIX);
+    File tempModFile = new File(REMOTE_FILE_TEMP_DIR, tempFilePath + ModificationFile.FILE_SUFFIX);
     if (pullRemoteFile(resource.getFile().getAbsolutePath(), node, tempFile)) {
       if (!checkMd5(tempFile, resource.getMd5())) {
         logger.error("The downloaded file of {} does not have the right MD5", resource);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index e0a11fc..ccd8bd7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -870,8 +870,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
    * @return true if the node has caught up, false otherwise
    */
   public boolean syncLeader() {
-    // make sure all snapshot pulling are done, otherwise some data will remain in the old nodes
-    logManager.waitRemoteSnapshots();
 
     if (character == NodeCharacter.LEADER) {
       return true;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index a888007..038ce78 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -344,7 +344,7 @@ public class DataGroupMemberTest extends MemberTest {
     processor.insert(insertPlan);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    dataGroupMember.applySnapshot(snapshot);
+    dataGroupMember.applySnapshot(snapshot, 0);
     assertEquals(3, processor.getSequenceFileTreeSet().size());
     assertEquals(0, processor.getUnSequenceFileList().size());
     Deletion deletion = new Deletion(new Path(TestUtils.getTestSg(0)), 0, 0);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1d734e4..3fdbe18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -553,4 +553,15 @@ public class StorageEngine implements IService {
   public static long getTimePartition(long time) {
     return time / timePartitionInterval;
   }
+
+  /**
+   * Set the version of given partition to newMaxVersion if it is larger than the current version.
+   * @param storageGroup
+   * @param partitionId
+   * @param newMaxVersion
+   */
+  public void setPartitionVersionToMax(String storageGroup, long partitionId, long newMaxVersion)
+      throws StorageEngineException, IOException {
+    getProcessor(storageGroup).setPartitionVersionToMax(partitionId, newMaxVersion);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 43bf6b0..b21d1aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,15 +19,21 @@
 
 package org.apache.iotdb.db.engine.modification;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
+import java.util.Random;
 import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
 import org.apache.iotdb.db.engine.modification.io.ModificationReader;
 import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
@@ -35,6 +41,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
  */
 public class ModificationFile implements AutoCloseable {
 
+  private static final Logger logger = LoggerFactory.getLogger(ModificationFile.class);
   public static final String FILE_SUFFIX = ".mods";
 
   private List<Modification> modifications;
@@ -125,4 +132,35 @@ public class ModificationFile implements AutoCloseable {
     FSFactoryProducer.getFSFactory().getFile(filePath).delete();
   }
 
+  public boolean exists() {
+    return new File(filePath).exists();
+  }
+
+  /**
+   * Create a hardlink for the modification file.
+   * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
+   * @return a new ModificationFile with its path changed to the hardlink, or null if the origin
+   * file does not exist or the hardlink cannot be created.
+   */
+  public ModificationFile createHardlink() {
+    if (!exists()) {
+      return null;
+    }
+
+    Random random = new Random();
+    while (true) {
+      String hardlinkSuffix = "." + System.currentTimeMillis() + "_" + random.nextLong();
+      File hardlink = new File(filePath + hardlinkSuffix);
+
+      try {
+        Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(filePath));
+        return new ModificationFile(hardlink.getAbsolutePath());
+      } catch (FileAlreadyExistsException e) {
+        // retry a different name if the file is already created
+      } catch (IOException e) {
+        logger.error("Cannot create hardlink for {}", filePath, e);
+        return null;
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1074e6c..e9ab6c2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1580,6 +1580,16 @@ public class StorageGroupProcessor {
   }
 
   /**
+   * Set the version in "partition" to "version" if "version" is larger than the current version.
+   * @param partition
+   * @param version
+   * @throws IOException
+   */
+  public void setPartitionVersionToMax(long partition, long version) throws IOException {
+    getVersionControllerByTimePartitionId(partition).setVersionToMax(version);
+  }
+
+  /**
    * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
    * @param newTsFileResource
    * @param newFilePartitionId
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 9e93f30..bab0d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -217,6 +220,10 @@ public class TsFileResource {
           ReadWriteIOUtils.write(historicalVersion, outputStream);
         }
       }
+
+      if (modFile != null && modFile.exists()) {
+        ReadWriteIOUtils.write(modFile.getFilePath(), outputStream);
+      }
     }
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
@@ -255,6 +262,10 @@ public class TsFileResource {
         long version = Long.parseLong(file.getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
         historicalVersions = Collections.singleton(version);
       }
+
+      if (inputStream.available() > 0) {
+        modFile = new ModificationFile(ReadWriteIOUtils.readString(inputStream));
+      }
     }
   }
 
@@ -521,4 +532,59 @@ public class TsFileResource {
     }
     return partitionId;
   }
+
+  /**
+   * Create a hardlink for the TsFile and modification file (if exists)
+   * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
+   * @return a new TsFileResource with its file changed to the hardlink or null the hardlink
+   * cannot be created.
+   */
+  public TsFileResource createHardlink() {
+    if (!file.exists()) {
+      return null;
+    }
+
+    TsFileResource newResource;
+    try {
+      newResource = new TsFileResource(this);
+    } catch (IOException e) {
+      logger.error("Cannot create hardlink for {}", file, e);
+      return null;
+    }
+
+    Random random = new Random();
+    while (true) {
+      String hardlinkSuffix = "." + System.currentTimeMillis() + "_" + random.nextLong();
+      File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix);
+
+      try {
+        Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(file.getAbsolutePath()));
+        newResource.setFile(hardlink);
+        if (modFile != null && modFile.exists()) {
+          newResource.setModFile(modFile.createHardlink());
+        }
+        break;
+      } catch (FileAlreadyExistsException e) {
+        // retry a different name if the file is already created
+      } catch (IOException e) {
+        logger.error("Cannot create hardlink for {}", file, e);
+        return null;
+      }
+    }
+    return newResource;
+  }
+
+  public void setModFile(ModificationFile modFile) {
+    this.modFile = modFile;
+  }
+
+  public long getMaxVersion() {
+    long maxVersion = 0;
+    if (historicalVersions != null) {
+      for (Long historicalVersion : historicalVersions) {
+        maxVersion = Math.max(historicalVersion, maxVersion);
+      }
+    }
+    return maxVersion;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 95c14b8..e3dbe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -92,6 +92,14 @@ public class SimpleFileVersionController implements VersionController {
     return currVersion;
   }
 
+  @Override
+  public void setVersionToMax(long version) throws IOException {
+    if (version > currVersion) {
+      currVersion = version;
+      persist();
+    }
+  }
+
   private void checkPersist() throws IOException {
     if ((currVersion - prevVersion) >= saveInterval) {
       persist();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
index 3e47cd9..2adb374 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * SysTimeVersionController uses system timestamp as the version number.
  */
@@ -39,4 +41,9 @@ public class SysTimeVersionController implements VersionController {
   public long currVersion() {
     return System.currentTimeMillis();
   }
+
+  @Override
+  public void setVersionToMax(long version) {
+
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
index 30c93c6..d2b01c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * VersionController controls the version(a monotonic increasing long) of a FileNode.
  */
@@ -34,4 +36,10 @@ public interface VersionController {
    * @return the current version number.
    */
   long currVersion();
+
+  /**
+   * Set current version to the given number if it is larger than the current version.
+   * @param version
+   */
+  void setVersionToMax(long version) throws IOException;
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 77cfac1..101a58b 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -84,6 +84,11 @@ public class LogReplayerTest {
       public long currVersion() {
         return 5;
       }
+
+      @Override
+      public void setVersionToMax(long version) {
+
+      }
     };
     TsFileResource tsFileResource = new TsFileResource(tsFile);
     IMemTable memTable = new PrimitiveMemTable();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 3c2a38b..f1811db 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -85,6 +85,11 @@ public class SeqTsFileRecoverTest {
     public long currVersion() {
       return i;
     }
+
+    @Override
+    public void setVersionToMax(long version) {
+
+    }
   };
 
   @Before
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index a1ae8bd..1bf402d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -82,6 +82,11 @@ public class UnseqTsFileRecoverTest {
     public long currVersion() {
       return i;
     }
+
+    @Override
+    public void setVersionToMax(long version) {
+
+    }
   };
 
   @Before