You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/04/10 04:34:02 UTC

[incubator-iotdb] branch cluster_data_snapshot created (now 3bcc1ba)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cluster_data_snapshot
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 3bcc1ba  first commit

This branch includes the following new commits:

     new 3bcc1ba  first commit

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: first commit

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_data_snapshot
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3bcc1ba369267fb26f6d47c687b092834c3e89a5
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Apr 10 12:33:51 2020 +0800

    first commit
---
 .../org/apache/iotdb/cluster/log/LogManager.java   |  7 ---
 .../manage/FilePartitionedSnapshotLogManager.java  | 71 +++++++++++-----------
 .../cluster/log/snapshot/PullSnapshotTask.java     |  2 +-
 .../cluster/server/member/DataGroupMember.java     | 31 +++++++---
 .../iotdb/cluster/server/member/RaftMember.java    |  2 -
 .../cluster/server/member/DataGroupMemberTest.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 11 ++++
 .../db/engine/modification/ModificationFile.java   | 40 +++++++++++-
 .../engine/storagegroup/StorageGroupProcessor.java | 10 +++
 .../db/engine/storagegroup/TsFileResource.java     | 66 ++++++++++++++++++++
 .../version/SimpleFileVersionController.java       |  8 +++
 .../engine/version/SysTimeVersionController.java   |  7 +++
 .../iotdb/db/engine/version/VersionController.java |  8 +++
 .../iotdb/db/writelog/recover/LogReplayerTest.java |  5 ++
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |  5 ++
 .../writelog/recover/UnseqTsFileRecoverTest.java   |  5 ++
 16 files changed, 227 insertions(+), 53 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
index 3f1a4bd..90b20bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogManager.java
@@ -80,11 +80,4 @@ public interface LogManager {
   void setLastLogId(long lastLogId);
 
   void setLastLogTerm(long lastLogTerm);
-
-  /**
-   * Wait until all remote snapshots are pulled locally.
-   */
-  default void waitRemoteSnapshots() {
-
-  };
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 293fe34..dfeaca1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -20,14 +20,13 @@
 package org.apache.iotdb.cluster.log.manage;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
-import org.apache.iotdb.cluster.log.snapshot.RemoteSnapshot;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -52,20 +51,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   }
 
   @Override
-  public void waitRemoteSnapshots() {
-    synchronized (slotSnapshots) {
-      for (Entry<Integer, FileSnapshot> entry : slotSnapshots.entrySet()) {
-        if (entry.getValue() instanceof RemoteSnapshot) {
-          ((RemoteSnapshot) entry.getValue()).getRemoteSnapshot();
-        }
-      }
-    }
-  }
-
-  @Override
   public void takeSnapshot() throws IOException {
-    // make sure every remote snapshot is pulled before creating local snapshot
-    waitRemoteSnapshots();
 
     logger.info("Taking snapshots, flushing IoTDB");
     StorageEngine.getInstance().syncCloseAllProcessor();
@@ -90,31 +76,48 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
   }
 
   private void collectTsFiles() throws IOException {
-    slotSnapshots.clear();
     // TODO-Cluster#349: the collection is re-collected each time to prevent inconsistency when
     //  some of them are removed during two snapshots. Incremental addition or removal may be
     //  used to optimize
 
-    Map<String, Map<Long, List<TsFileResource>>> allClosedStorageGroupTsFile = StorageEngine
-        .getInstance().getAllClosedStorageGroupTsFile();
-    for (Entry<String, Map<Long, List<TsFileResource>>> entry :
-        allClosedStorageGroupTsFile.entrySet()) {
-      String storageGroupName = entry.getKey();
-      Map<Long, List<TsFileResource>> storageGroupsFiles = entry.getValue();
-      for (Entry<Long, List<TsFileResource>> storageGroupFiles : storageGroupsFiles.entrySet()) {
-        Long partitionNum = storageGroupFiles.getKey();
-        int slotNum = PartitionUtils.calculateStorageGroupSlotByPartition(storageGroupName,
-            partitionNum, partitionTable.getTotalSlotNumbers());
-        FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
-            s -> new FileSnapshot());
-        if (snapshot.getTimeseriesSchemas().isEmpty()) {
-          snapshot.setTimeseriesSchemas(slotTimeseries.getOrDefault(slotNum,
-              Collections.emptySet()));
-        }
-        for (TsFileResource tsFileResource : storageGroupFiles.getValue()) {
-          snapshot.addFile(tsFileResource, header);
+    startCollect:
+    while (true) {
+      slotSnapshots.clear();
+      Map<String, Map<Long, List<TsFileResource>>> allClosedStorageGroupTsFile = StorageEngine
+          .getInstance().getAllClosedStorageGroupTsFile();
+      List<TsFileResource> createdHardlinks = new ArrayList<>();
+      // group the TsFiles by their slots
+      for (Entry<String, Map<Long, List<TsFileResource>>> entry :
+          allClosedStorageGroupTsFile.entrySet()) {
+        String storageGroupName = entry.getKey();
+        Map<Long, List<TsFileResource>> storageGroupsFiles = entry.getValue();
+        for (Entry<Long, List<TsFileResource>> storageGroupFiles : storageGroupsFiles.entrySet()) {
+          Long partitionNum = storageGroupFiles.getKey();
+          int slotNum = PartitionUtils.calculateStorageGroupSlotByPartition(storageGroupName,
+              partitionNum, partitionTable.getTotalSlotNumbers());
+          FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
+              s -> new FileSnapshot());
+          if (snapshot.getTimeseriesSchemas().isEmpty()) {
+            snapshot.setTimeseriesSchemas(slotTimeseries.getOrDefault(slotNum,
+                Collections.emptySet()));
+          }
+
+          for (TsFileResource tsFileResource : storageGroupFiles.getValue()) {
+            TsFileResource hardlink = tsFileResource.createHardlink();
+            if (hardlink == null) {
+              // some file is deleted during the collecting, clean created hardlinks and restart
+              // from the beginning
+              for (TsFileResource createdHardlink : createdHardlinks) {
+                createdHardlink.remove();
+              }
+              continue startCollect;
+            }
+            createdHardlinks.add(hardlink);
+            snapshot.addFile(hardlink, header);
+          }
         }
       }
+      break;
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index e8a5a08..2a077b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -96,7 +96,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
         }
         for (Entry<Integer, T> entry : result.entrySet()) {
           try {
-            newMember.applySnapshot(entry.getValue());
+            newMember.applySnapshot(entry.getValue(), entry.getKey());
           } catch (SnapshotApplicationException e) {
             logger.error("Apply snapshot failed, retry...", e);
             Thread.sleep(ClusterConstant.PULL_SNAPSHOT_RETRY_INTERVAL);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 32f98eb..8899252 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -369,7 +369,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
    * supported in the future.
    * @param snapshot
    */
-  public void applySnapshot(Snapshot snapshot) throws SnapshotApplicationException {
+  public void applySnapshot(Snapshot snapshot, int slot) throws SnapshotApplicationException {
     logger.debug("{}: applying snapshot {}", name, snapshot);
     if (snapshot instanceof FileSnapshot) {
       try {
@@ -389,7 +389,8 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
    * overlap with existing files.
    * @param snapshot
    */
-  private void applyFileSnapshot(FileSnapshot snapshot) throws PullFileException {
+  private void applyFileSnapshot(FileSnapshot snapshot)
+      throws PullFileException, SnapshotApplicationException {
     synchronized (logManager) {
       // load metadata in the snapshot
       for (MeasurementSchema schema : snapshot.getTimeseriesSchemas()) {
@@ -400,6 +401,19 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       // load data in the snapshot
       // TODO-Cluster: deal with the failure of pulling a file
       List<RemoteTsFileResource> remoteTsFileResources = snapshot.getDataFiles();
+      // set partition versions
+      for (RemoteTsFileResource remoteTsFileResource : remoteTsFileResources) {
+        String[] pathSegments = FilePathUtils.splitTsFilePath(remoteTsFileResource);
+        int segSize = pathSegments.length;
+        String storageGroupName = pathSegments[segSize - 3];
+        try {
+          StorageEngine.getInstance().setPartitionVersionToMax(storageGroupName,
+              remoteTsFileResource.getTimePartition(), remoteTsFileResource.getMaxVersion());
+        } catch (StorageEngineException | IOException e) {
+          throw new SnapshotApplicationException(e);
+        }
+      }
+      // pull file
       for (RemoteTsFileResource resource : remoteTsFileResources) {
         if (!isFileAlreadyPulled(resource)) {
           loadRemoteFile(resource);
@@ -438,7 +452,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
       for (Integer slot : slots) {
         Snapshot subSnapshot = snapshot.getSnapshot(slot);
         if (subSnapshot != null) {
-          applySnapshot(subSnapshot);
+          applySnapshot(subSnapshot, slot);
         }
       }
       logManager.setLastLogId(snapshot.getLastLogId());
@@ -524,12 +538,15 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
     int segSize = pathSegments.length;
     // the new file is stored at:
     // remote/{nodeIdentifier}/{storageGroupName}/{partitionNum}/{fileName}
-    String tempFileName =
+    // the file in the snapshot is a hardlink, remove the hardlink suffix
+    String tempFileName = pathSegments[segSize - 1].substring(0,
+        pathSegments[segSize - 1].lastIndexOf('.'));
+    String tempFilePath =
         node.getNodeIdentifier() + File.separator + pathSegments[segSize - 3] +
-            File.separator + pathSegments[segSize - 2] + File.separator + pathSegments[segSize - 1];
-    File tempFile = new File(REMOTE_FILE_TEMP_DIR, tempFileName);
+            File.separator + pathSegments[segSize - 2] + File.separator + tempFileName;
+    File tempFile = new File(REMOTE_FILE_TEMP_DIR, tempFilePath);
     tempFile.getParentFile().mkdirs();
-    File tempModFile = new File(REMOTE_FILE_TEMP_DIR, tempFileName + ModificationFile.FILE_SUFFIX);
+    File tempModFile = new File(REMOTE_FILE_TEMP_DIR, tempFilePath + ModificationFile.FILE_SUFFIX);
     if (pullRemoteFile(resource.getFile().getAbsolutePath(), node, tempFile)) {
       if (!checkMd5(tempFile, resource.getMd5())) {
         logger.error("The downloaded file of {} does not have the right MD5", resource);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index e0a11fc..ccd8bd7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -870,8 +870,6 @@ public abstract class RaftMember implements RaftService.AsyncIface {
    * @return true if the node has caught up, false otherwise
    */
   public boolean syncLeader() {
-    // make sure all snapshot pulling are done, otherwise some data will remain in the old nodes
-    logManager.waitRemoteSnapshots();
 
     if (character == NodeCharacter.LEADER) {
       return true;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index a888007..038ce78 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -344,7 +344,7 @@ public class DataGroupMemberTest extends MemberTest {
     processor.insert(insertPlan);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    dataGroupMember.applySnapshot(snapshot);
+    dataGroupMember.applySnapshot(snapshot, 0);
     assertEquals(3, processor.getSequenceFileTreeSet().size());
     assertEquals(0, processor.getUnSequenceFileList().size());
     Deletion deletion = new Deletion(new Path(TestUtils.getTestSg(0)), 0, 0);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1d734e4..3fdbe18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -553,4 +553,15 @@ public class StorageEngine implements IService {
   public static long getTimePartition(long time) {
     return time / timePartitionInterval;
   }
+
+  /**
+   * Set the version of given partition to newMaxVersion if it is larger than the current version.
+   * @param storageGroup
+   * @param partitionId
+   * @param newMaxVersion
+   */
+  public void setPartitionVersionToMax(String storageGroup, long partitionId, long newMaxVersion)
+      throws StorageEngineException, IOException {
+    getProcessor(storageGroup).setPartitionVersionToMax(partitionId, newMaxVersion);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 43bf6b0..b21d1aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,15 +19,21 @@
 
 package org.apache.iotdb.db.engine.modification;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
+import java.util.Random;
 import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
 import org.apache.iotdb.db.engine.modification.io.ModificationReader;
 import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
@@ -35,6 +41,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
  */
 public class ModificationFile implements AutoCloseable {
 
+  private static final Logger logger = LoggerFactory.getLogger(ModificationFile.class);
   public static final String FILE_SUFFIX = ".mods";
 
   private List<Modification> modifications;
@@ -125,4 +132,35 @@ public class ModificationFile implements AutoCloseable {
     FSFactoryProducer.getFSFactory().getFile(filePath).delete();
   }
 
+  public boolean exists() {
+    return new File(filePath).exists();
+  }
+
+  /**
+   * Create a hardlink for the modification file.
+   * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
+   * @return a new ModificationFile with its path changed to the hardlink, or null if the origin
+   * file does not exist or the hardlink cannot be created.
+   */
+  public ModificationFile createHardlink() {
+    if (!exists()) {
+      return null;
+    }
+
+    Random random = new Random();
+    while (true) {
+      String hardlinkSuffix = "." + System.currentTimeMillis() + "_" + random.nextLong();
+      File hardlink = new File(filePath + hardlinkSuffix);
+
+      try {
+        Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(filePath));
+        return new ModificationFile(hardlink.getAbsolutePath());
+      } catch (FileAlreadyExistsException e) {
+        // retry a different name if the file is already created
+      } catch (IOException e) {
+        logger.error("Cannot create hardlink for {}", filePath, e);
+        return null;
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1074e6c..e9ab6c2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1580,6 +1580,16 @@ public class StorageGroupProcessor {
   }
 
   /**
+   * Set the version in "partition" to "version" if "version" is larger than the current version.
+   * @param partition
+   * @param version
+   * @throws IOException
+   */
+  public void setPartitionVersionToMax(long partition, long version) throws IOException {
+    getVersionControllerByTimePartitionId(partition).setVersionToMax(version);
+  }
+
+  /**
    * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
    * @param newTsFileResource
    * @param newFilePartitionId
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 9e93f30..bab0d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -217,6 +220,10 @@ public class TsFileResource {
           ReadWriteIOUtils.write(historicalVersion, outputStream);
         }
       }
+
+      if (modFile != null && modFile.exists()) {
+        ReadWriteIOUtils.write(modFile.getFilePath(), outputStream);
+      }
     }
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
@@ -255,6 +262,10 @@ public class TsFileResource {
         long version = Long.parseLong(file.getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
         historicalVersions = Collections.singleton(version);
       }
+
+      if (inputStream.available() > 0) {
+        modFile = new ModificationFile(ReadWriteIOUtils.readString(inputStream));
+      }
     }
   }
 
@@ -521,4 +532,59 @@ public class TsFileResource {
     }
     return partitionId;
   }
+
+  /**
+   * Create a hardlink for the TsFile and modification file (if exists)
+   * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
+   * @return a new TsFileResource with its file changed to the hardlink or null the hardlink
+   * cannot be created.
+   */
+  public TsFileResource createHardlink() {
+    if (!file.exists()) {
+      return null;
+    }
+
+    TsFileResource newResource;
+    try {
+      newResource = new TsFileResource(this);
+    } catch (IOException e) {
+      logger.error("Cannot create hardlink for {}", file, e);
+      return null;
+    }
+
+    Random random = new Random();
+    while (true) {
+      String hardlinkSuffix = "." + System.currentTimeMillis() + "_" + random.nextLong();
+      File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix);
+
+      try {
+        Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(file.getAbsolutePath()));
+        newResource.setFile(hardlink);
+        if (modFile != null && modFile.exists()) {
+          newResource.setModFile(modFile.createHardlink());
+        }
+        break;
+      } catch (FileAlreadyExistsException e) {
+        // retry a different name if the file is already created
+      } catch (IOException e) {
+        logger.error("Cannot create hardlink for {}", file, e);
+        return null;
+      }
+    }
+    return newResource;
+  }
+
+  public void setModFile(ModificationFile modFile) {
+    this.modFile = modFile;
+  }
+
+  public long getMaxVersion() {
+    long maxVersion = 0;
+    if (historicalVersions != null) {
+      for (Long historicalVersion : historicalVersions) {
+        maxVersion = Math.max(historicalVersion, maxVersion);
+      }
+    }
+    return maxVersion;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 95c14b8..e3dbe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -92,6 +92,14 @@ public class SimpleFileVersionController implements VersionController {
     return currVersion;
   }
 
+  @Override
+  public void setVersionToMax(long version) throws IOException {
+    if (version > currVersion) {
+      currVersion = version;
+      persist();
+    }
+  }
+
   private void checkPersist() throws IOException {
     if ((currVersion - prevVersion) >= saveInterval) {
       persist();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
index 3e47cd9..2adb374 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * SysTimeVersionController uses system timestamp as the version number.
  */
@@ -39,4 +41,9 @@ public class SysTimeVersionController implements VersionController {
   public long currVersion() {
     return System.currentTimeMillis();
   }
+
+  @Override
+  public void setVersionToMax(long version) {
+
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
index 30c93c6..d2b01c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.engine.version;
 
+import java.io.IOException;
+
 /**
  * VersionController controls the version(a monotonic increasing long) of a FileNode.
  */
@@ -34,4 +36,10 @@ public interface VersionController {
    * @return the current version number.
    */
   long currVersion();
+
+  /**
+   * Set current version to the given number if it is larger than the current version.
+   * @param version
+   */
+  void setVersionToMax(long version) throws IOException;
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 77cfac1..101a58b 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -84,6 +84,11 @@ public class LogReplayerTest {
       public long currVersion() {
         return 5;
       }
+
+      @Override
+      public void setVersionToMax(long version) {
+
+      }
     };
     TsFileResource tsFileResource = new TsFileResource(tsFile);
     IMemTable memTable = new PrimitiveMemTable();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 3c2a38b..f1811db 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -85,6 +85,11 @@ public class SeqTsFileRecoverTest {
     public long currVersion() {
       return i;
     }
+
+    @Override
+    public void setVersionToMax(long version) {
+
+    }
   };
 
   @Before
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index a1ae8bd..1bf402d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -82,6 +82,11 @@ public class UnseqTsFileRecoverTest {
     public long currVersion() {
       return i;
     }
+
+    @Override
+    public void setVersionToMax(long version) {
+
+    }
   };
 
   @Before