You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/01 06:31:07 UTC

[incubator-iotdb] branch master updated: Fix visible metadata, version, work processor bugs when recovering (#966)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c6c23a3  Fix visible metadata, version, work processor bugs when recovering (#966)
c6c23a3 is described below

commit c6c23a3c88145fe9f4615b6987ce6842f51b91e3
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Wed Apr 1 14:31:01 2020 +0800

    Fix visible metadata, version, work processor bugs when recovering (#966)
---
 .../main/java/org/apache/iotdb/SessionExample.java |  1 -
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  3 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 95 +++++-----------------
 .../db/engine/storagegroup/TsFileResource.java     | 49 +++++++++++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 55 ++++++-------
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |  1 +
 .../writelog/recover/TsFileRecoverPerformer.java   |  7 +-
 ...yMergeTest.java => IoTDBFlushQueryMergeIT.java} |  2 +-
 ...ileTest.java => IoTDBLoadExternalTsfileIT.java} |  4 +-
 .../iotdb/db/integration/IoTDBRestartIT.java       | 87 ++++++++++++++++++++
 ...impleQueryTest.java => IoTDBSimpleQueryIT.java} |  2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  5 ++
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |  4 +
 .../tsfile/file/metadata/ChunkGroupMetadata.java   | 44 ++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 70 ++++++----------
 .../write/writer/RestorableTsFileIOWriter.java     | 43 +++++-----
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 90 +++++++++++---------
 17 files changed, 342 insertions(+), 220 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 33e3090..70b5411 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.record.RowBatch;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.Schema;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3ef7e0a..dc8a3b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,8 +498,7 @@ public class StorageEngine implements IService {
         if (!sequenceFile.isClosed()) {
           continue;
         }
-        String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
-        long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
+        long partitionNum = sequenceFile.getTimePartition();
         Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
             , n -> new HashMap<>());
         storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e006118..0ce93bd 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -62,10 +62,8 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.MergeException;
-import org.apache.iotdb.db.exception.PartitionViolationException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -81,7 +79,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -262,8 +259,7 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
-        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
       for (TsFileResource resource : unseqTsFiles) {
@@ -271,8 +267,7 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
-        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+        long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
 
@@ -296,30 +291,16 @@ public class StorageGroupProcessor {
     }
 
     for (TsFileResource resource : sequenceFileTreeSet) {
-      long timePartitionId = getTimePartitionFromTsFileResource(resource);
-      if (timePartitionId != -1) {
-        latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
-            .putAll(resource.getEndTimeMap());
-        partitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(timePartitionId, id -> new HashMap<>())
-            .putAll(resource.getEndTimeMap());
-        globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
-      }
+      long timePartitionId = resource.getTimePartition();
+      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+          .putAll(resource.getEndTimeMap());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+          .putAll(resource.getEndTimeMap());
+      globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
     }
   }
 
-  private long getTimePartitionFromTsFileResource(TsFileResource resource) {
-    // device id -> start map
-    // if start time map is empty, tsfile resource is empty, return -1;
-    Map<String, Long> startTimeMap = resource.getStartTimeMap();
-    // just find any time of device
-    Iterator<Long> iterator = startTimeMap.values().iterator();
-    if (iterator.hasNext()) {
-      return StorageEngine.getTimePartition(iterator.next());
-    }
-
-    return -1;
-  }
 
   /**
    * get version controller by time partition Id Thread-safety should be ensure by caller
@@ -393,7 +374,7 @@ public class StorageGroupProcessor {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       sequenceFileTreeSet.add(tsFileResource);
-      long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long timePartitionId = tsFileResource.getTimePartition();
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
@@ -408,7 +389,7 @@ public class StorageGroupProcessor {
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
-        workUnsequenceTsFileProcessors
+        workSequenceTsFileProcessors
             .put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -422,7 +403,7 @@ public class StorageGroupProcessor {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       unSequenceFileList.add(tsFileResource);
-      long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long timePartitionId = tsFileResource.getTimePartition();
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
@@ -437,6 +418,8 @@ public class StorageGroupProcessor {
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::unsequenceFlushCallback, false, writer);
+        workUnsequenceTsFileProcessors
+            .put(timePartitionId, tsFileProcessor);
         tsFileResource.setProcessor(tsFileProcessor);
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
@@ -1217,7 +1200,7 @@ public class StorageGroupProcessor {
         continue;
       }
 
-      long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
+      long partitionId = tsFileResource.getTimePartition();
       deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());
 
       // write deletion into modification file
@@ -1508,7 +1491,7 @@ public class StorageGroupProcessor {
    */
   public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getFile();
-    long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
+    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock();
     mergeLock.writeLock().lock();
     try {
@@ -1544,7 +1527,7 @@ public class StorageGroupProcessor {
    */
   public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getFile();
-    long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
+    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock();
     mergeLock.writeLock().lock();
     try {
@@ -1564,7 +1547,7 @@ public class StorageGroupProcessor {
         // check whether the file name needs to be renamed.
         if (!sequenceFileTreeSet.isEmpty()) {
           String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
-              getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
+              newTsFileResource.getTimePartition(), sequenceList);
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
@@ -1577,8 +1560,7 @@ public class StorageGroupProcessor {
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
-      String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
-      long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+      long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
     } catch (DiskSpaceInsufficientException e) {
@@ -1594,40 +1576,6 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Check and get the partition id of a TsFile to be inserted using the start times and end
-   * times of devices.
-   * TODO: when the partition violation happens, split the file and load into different partitions
-   * @throws LoadFileException if the data of the file cross partitions or it is empty
-   */
-  private long getNewFilePartitionId(TsFileResource resource) throws LoadFileException {
-    long partitionId = -1;
-    for (Long startTime : resource.getStartTimeMap().values()) {
-      long p = StorageEngine.getTimePartition(startTime);
-      if (partitionId == -1) {
-        partitionId = p;
-      } else {
-        if (partitionId != p) {
-          throw new PartitionViolationException(resource);
-        }
-      }
-    }
-    for (Long endTime : resource.getEndTimeMap().values()) {
-      long p = StorageEngine.getTimePartition(endTime);
-      if (partitionId == -1) {
-        partitionId = p;
-      } else {
-        if (partitionId != p) {
-          throw new PartitionViolationException(resource);
-        }
-      }
-    }
-    if (partitionId == -1) {
-      throw new LoadEmptyFileException();
-    }
-    return partitionId;
-  }
-
-  /**
    * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
    * @param newTsFileResource
    * @param newFilePartitionId
@@ -1715,15 +1663,10 @@ public class StorageGroupProcessor {
 
   /**
    * If the historical versions of a file is a sub-set of the given file's, remove it to reduce
-<<<<<<< HEAD
-   * unnecessary merge. Only used when the file sender and the receiver share the same file close
-   * policy.
-=======
    * unnecessary merge. Only used when the file sender and the receiver share the same file
    * close policy.
    * Warning: DO NOT REMOVE
    * @param resource
->>>>>>> master
    */
   @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ce2d7da..ce20b85 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -20,10 +20,13 @@ package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
+import org.apache.iotdb.db.exception.PartitionViolationException;
 import org.apache.iotdb.db.service.UpgradeSevice;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -471,4 +474,50 @@ public class TsFileResource {
   public TimeseriesMetadata getTimeSeriesMetadata() {
     return timeSeriesMetadata;
   }
+
+  /**
+   * make sure Either the startTimeMap is not empty
+   *           Or the path contains a partition folder
+   */
+  public long getTimePartition() {
+    if (startTimeMap != null && !startTimeMap.isEmpty()) {
+      return StorageEngine.getTimePartition(startTimeMap.values().iterator().next());
+    }
+    String[] splits = FilePathUtils.splitTsFilePath(this);
+    return Long.parseLong(splits[splits.length - 2]);
+  }
+
+  /**
+   * Used when load new TsFiles not generated by the server
+   * Check and get the time partition
+   * TODO: when the partition violation happens, split the file and load into different partitions
+   * @throws PartitionViolationException if the data of the file cross partitions or it is empty
+   */
+  public long getTimePartitionWithCheck() throws PartitionViolationException {
+    long partitionId = -1;
+    for (Long startTime : startTimeMap.values()) {
+      long p = StorageEngine.getTimePartition(startTime);
+      if (partitionId == -1) {
+        partitionId = p;
+      } else {
+        if (partitionId != p) {
+          throw new PartitionViolationException(this);
+        }
+      }
+    }
+    for (Long endTime : endTimeMap.values()) {
+      long p = StorageEngine.getTimePartition(endTime);
+      if (partitionId == -1) {
+        partitionId = p;
+      } else {
+        if (partitionId != p) {
+          throw new PartitionViolationException(this);
+        }
+      }
+    }
+    if (partitionId == -1) {
+      throw new PartitionViolationException(this);
+    }
+    return partitionId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index f2ef3ec..de6bfc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -113,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -571,9 +571,10 @@ public class PlanExecutor implements IPlanExecutor {
                 file.getAbsolutePath()));
       }
       Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
-      Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>();
+
+      List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
       try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
-        reader.selfCheck(schemaMap, chunkMetaDataListMap, false);
+        reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
       }
 
       FileLoaderUtils.checkTsFileResource(tsFileResource);
@@ -586,7 +587,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       //create schemas if they doesn't exist
       if (plan.isAutoCreateSchema()) {
-        createSchemaAutomatically(chunkMetaDataListMap, schemaMap, plan.getSgLevel());
+        createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
       }
 
       StorageEngine.getInstance().loadNewTsFile(tsFileResource);
@@ -596,36 +597,34 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> chunkMetaDataListMap,
+  private void createSchemaAutomatically(
+      List<ChunkGroupMetadata> chunkGroupMetadataList,
       Map<Path, MeasurementSchema> knownSchemas, int sgLevel)
       throws QueryProcessException, MetadataException {
-    if (chunkMetaDataListMap.isEmpty()) {
+    if (chunkGroupMetadataList.isEmpty()) {
       return;
     }
-    for (Entry<Path, List<ChunkMetadata>> entry : chunkMetaDataListMap.entrySet()) {
-      String device = entry.getKey().getDevice();
+
+    Set<Path> registeredSeries = new HashSet<>();
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      String device = chunkGroupMetadata.getDevice();
       MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
-      for (ChunkMetadata chunkMetaData : entry.getValue()) {
-        String measurement = chunkMetaData.getMeasurementUid();
-        String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement;
-        MeasurementSchema schema = knownSchemas.get(entry.getKey());
-        if (schema == null) {
-          throw new MetadataException(String
-              .format("Can not get the schema of measurement [%s]", measurement));
-        }
-        if (!node.hasChild(measurement)) {
-          try {
-            mManager.createTimeseries(fullPath, schema.getType(), schema.getEncodingType(),
-                schema.getCompressor(), Collections.emptyMap());
-          } catch (MetadataException e) {
-            if (!e.getMessage().contains("already exist")) {
-              throw e;
-            }
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+        if (!registeredSeries.contains(series)) {
+          registeredSeries.add(series);
+          MeasurementSchema schema = knownSchemas.get(series);
+          if (schema == null) {
+            throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
+                    chunkMetadata.getMeasurementUid()));
+          }
+          if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
+            mManager.createTimeseries(series.getFullPath(), schema.getType(),
+                schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
+          } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
+            throw new QueryProcessException(
+                String.format("Current Path is not leaf node. %s", series));
           }
-        }
-        if (node.getChild(measurement) instanceof InternalMNode) {
-          throw new QueryProcessException(
-              String.format("Current Path is not leaf node. %s", fullPath));
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index d89e6fb..5e9297a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -45,4 +45,5 @@ public class FilePathUtils {
   public static String[] splitTsFilePath(TsFileResource resource) {
     return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
   }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index ad22e19..f126c78 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -58,7 +58,6 @@ public class TsFileRecoverPerformer {
   private String insertFilePath;
   private String logNodePrefix;
   private VersionController versionController;
-  private LogReplayer logReplayer;
   private TsFileResource resource;
   private boolean acceptUnseq;
   private boolean isLastFile;
@@ -81,9 +80,6 @@ public class TsFileRecoverPerformer {
    */
   public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
 
-    IMemTable recoverMemTable = new PrimitiveMemTable();
-    this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
-        versionController, resource, recoverMemTable, acceptUnseq);
     File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
     if (!insertFile.exists()) {
       logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
@@ -191,13 +187,12 @@ public class TsFileRecoverPerformer {
   private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
       throws StorageGroupProcessorException {
     IMemTable recoverMemTable = new PrimitiveMemTable();
-    this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
+    LogReplayer logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
         versionController, resource, recoverMemTable, acceptUnseq);
     logReplayer.replayLogs();
     try {
       if (!recoverMemTable.isEmpty()) {
         // flush logs
-
         MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
             restorableTsFileIOWriter, resource.getFile().getParentFile().getParentFile().getName());
         tableFlushTask.syncFlushMemTable();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
index bdac19b..7acd7a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
@@ -34,7 +34,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class IoTDBFlushQueryMergeTest {
+public class IoTDBFlushQueryMergeIT {
 
   private static String[] sqls = new String[]{
       "SET STORAGE GROUP TO root.vehicle.d0",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index 3618e2c..ec9092a 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -42,9 +42,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class IoTDBLoadExternalTsfileTest {
+public class IoTDBLoadExternalTsfileIT {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileTest.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadExternalTsfileIT.class);
   private static String[] insertSequenceSqls = new String[]{
       "SET STORAGE GROUP TO root.vehicle",
       "SET STORAGE GROUP TO root.test",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
new file mode 100644
index 0000000..931ecb2
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.Test;
+
+public class IoTDBRestartIT {
+
+  @Test
+  public void testRestart()
+      throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+
+    try(Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("insert into root.turbine.d1(timestamp,s1) values(1,1)");
+      statement.execute("flush");
+    }
+
+    EnvironmentUtils.restartDaemon();
+
+    try(Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("insert into root.turbine.d1(timestamp,s1) values(2,1)");
+    }
+
+    EnvironmentUtils.restartDaemon();
+
+    try(Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("insert into root.turbine.d1(timestamp,s1) values(3,1)");
+
+      boolean hasResultSet = statement.execute("SELECT s1 FROM root.turbine.d1");
+      assertTrue(hasResultSet);
+      String[] exp = new String[]{
+          "1,1",
+          "2,1",
+          "3,1"
+      };
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String result = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2);
+        assertEquals(exp[cnt], result);
+        cnt++;
+      }
+    }
+
+    EnvironmentUtils.cleanEnv();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java
rename to server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 9c05af0..0425390 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -32,7 +32,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-public class IoTDBSimpleQueryTest {
+public class IoTDBSimpleQueryIT {
 
   @Before
   public void setUp() throws Exception {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 716e14b..37162b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -231,6 +231,11 @@ public class EnvironmentUtils {
     }
   }
 
+  public static void restartDaemon() {
+    stopDaemon();
+    reactiveDaemon();
+  }
+
   private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 8480e4a..3c2a38b 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -215,6 +215,10 @@ public class SeqTsFileRecoverTest {
         versionController, resource, true, true);
     ActiveTimeSeriesCounter.getInstance().init(storageGroup);
     RestorableTsFileIOWriter writer = performer.recover();
+
+    writer.makeMetadataVisible();
+    assertEquals(11, writer.getMetadatasForQuery().size());
+
     assertTrue(writer.canWrite());
     writer.endFile();
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java
new file mode 100644
index 0000000..d01f041
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetadata.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.file.metadata;
+
+import java.util.List;
+
+/**
+ * Only maintained when writing, not serialized to TsFile
+ */
+public class ChunkGroupMetadata {
+
+  private String device;
+
+  private List<ChunkMetadata> chunkMetadataList;
+
+  public ChunkGroupMetadata(String device, List<ChunkMetadata> chunkMetadataList) {
+    this.device = device;
+    this.chunkMetadataList = chunkMetadataList;
+  }
+
+  public String getDevice() {
+    return device;
+  }
+
+  public List<ChunkMetadata> getChunkMetadataList() {
+    return chunkMetadataList;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 93354a6..b7122b8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
@@ -561,10 +562,8 @@ public class TsFileSequenceReader implements AutoCloseable {
   /**
    * Self Check the file and return the position before where the data is safe.
    *
-   * @param newSchema   @OUT. the measurement schema in the file will be added into this parameter.
-   *                    (can be null)
-   * @param chunkMetadataListMap   @OUT. the treeMap (Path -> ChunkmetadataList)
-   *                    (can be null)
+   * @param newSchema   the schema on each time series in the file
+   * @param chunkGroupMetadataList  ChunkGroupMetadata List
    * @param fastFinish  if true and the file is complete, then newSchema and newMetaData parameter
    *                    will be not modified.
    * @return the position of the file that is fine. All data after the position in the file should
@@ -572,8 +571,7 @@ public class TsFileSequenceReader implements AutoCloseable {
    */
 
   public long selfCheck(Map<Path, MeasurementSchema> newSchema,
-      Map<Path, List<ChunkMetadata>> chunkMetadataListMap,
-      boolean fastFinish) throws IOException {
+      List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish) throws IOException {
     File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
     long fileSize;
     if (!checkFile.exists()) {
@@ -586,7 +584,8 @@ public class TsFileSequenceReader implements AutoCloseable {
     TSDataType dataType;
     long fileOffsetOfChunk;
 
-    List<ChunkMetadata> chunks = null;
+    // ChunkMetadata of current ChunkGroup
+    List<ChunkMetadata> chunkMetadataList = null;
     String deviceID;
 
     int position = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER
@@ -611,56 +610,44 @@ public class TsFileSequenceReader implements AutoCloseable {
     boolean newChunkGroup = true;
     // not a complete file, we will recover it...
     long truncatedPosition = TSFileConfig.MAGIC_STRING.getBytes().length;
-    boolean goon = true;
     byte marker;
     int chunkCnt = 0;
     List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
     try {
-      while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
           case MetaMarker.CHUNK_HEADER:
             // this is the first chunk of a new ChunkGroup.
             if (newChunkGroup) {
               newChunkGroup = false;
-              chunks = new ArrayList<>();
+              chunkMetadataList = new ArrayList<>();
             }
             fileOffsetOfChunk = this.position() - 1;
             // if there is something wrong with a chunk, we will drop the whole ChunkGroup
             // as different chunks may be created by the same insertions(sqls), and partial
             // insertion is not tolerable
-            ChunkHeader header = this.readChunkHeader();
-            measurementID = header.getMeasurementID();
+            ChunkHeader chunkHeader = this.readChunkHeader();
+            measurementID = chunkHeader.getMeasurementID();
             MeasurementSchema measurementSchema = new MeasurementSchema(measurementID,
-                header.getDataType(),
-                header.getEncodingType(), header.getCompressionType());
+                chunkHeader.getDataType(),
+                chunkHeader.getEncodingType(), chunkHeader.getCompressionType());
             measurementSchemaList.add(measurementSchema);
-            dataType = header.getDataType();
+            dataType = chunkHeader.getDataType();
             Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
-            if (header.getNumOfPages() > 0) {
-              PageHeader pageHeader = this.readPageHeader(header.getDataType());
-              chunkStatistics.mergeStatistics(pageHeader.getStatistics());
-              this.skipPageData(pageHeader);
-            }
-            for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+            for (int j = 0; j < chunkHeader.getNumOfPages(); j++) {
               // a new Page
-              PageHeader pageHeader = this.readPageHeader(header.getDataType());
-              chunkStatistics.mergeStatistics(pageHeader.getStatistics());
-              this.skipPageData(pageHeader);
-            }
-            if (header.getNumOfPages() > 1) {
-              PageHeader pageHeader = this.readPageHeader(header.getDataType());
+              PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType());
               chunkStatistics.mergeStatistics(pageHeader.getStatistics());
               this.skipPageData(pageHeader);
             }
             currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk,
                 chunkStatistics);
-            chunks.add(currentChunk);
+            chunkMetadataList.add(currentChunk);
             chunkCnt++;
             break;
           case MetaMarker.CHUNK_GROUP_FOOTER:
             // this is a chunk group
-            // if there is something wrong with the ChunkGroup Footer, we will drop this
-            // ChunkGroup
+            // if there is something wrong with the ChunkGroup Footer, we will drop this ChunkGroup
             // because we can not guarantee the correctness of the deviceId.
             ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
             deviceID = chunkGroupFooter.getDeviceID();
@@ -669,12 +656,7 @@ public class TsFileSequenceReader implements AutoCloseable {
                 newSchema.putIfAbsent(new Path(deviceID, tsSchema.getMeasurementId()), tsSchema);
               }
             }
-            if (chunkMetadataListMap != null) {
-              for (ChunkMetadata chunk : chunks) {
-                Path path = new Path(deviceID, chunk.getMeasurementUid());
-                chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(chunk);
-              }
-            }
+            chunkGroupMetadataList.add(new ChunkGroupMetadata(deviceID, chunkMetadataList));
             newChunkGroup = true;
             truncatedPosition = this.position();
 
@@ -684,23 +666,17 @@ public class TsFileSequenceReader implements AutoCloseable {
             break;
           default:
             // the disk file is corrupted, using this file may be dangerous
-            MetaMarker.handleUnexpectedMarker(marker);
-            goon = false;
-            logger.error(String
-                .format("Unrecognized marker detected, this file {%s} may be corrupted", file));
+            throw new IOException("Unexpected marker " + marker);
         }
       }
       // now we read the tail of the data section, so we are sure that the last
-      // ChunkGroupFooter is
-      // complete.
+      // ChunkGroupFooter is complete.
       truncatedPosition = this.position() - 1;
-    } catch (Exception e2) {
+    } catch (Exception e) {
       logger.info("TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
-          file,
-          this.position(), e2.getMessage());
+          file, this.position(), e.getMessage());
     }
-    // Despite the completeness of the data section, we will discard current
-    // FileMetadata
+    // Despite the completeness of the data section, we will discard current FileMetadata
     // so that we can continue to write data into this tsfile.
     return truncatedPosition;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index c1fa885..bef6bb2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -59,7 +60,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   /**
    * all chunk group metadata which have been serialized on disk.
    */
-  private Map<String, Map<String, List<ChunkMetadata>>> metadatas = new HashMap<>();
+  private Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = new HashMap<>();
 
   /**
    * @param file a given tsfile path you want to (continue to) write
@@ -90,7 +91,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
         }
 
         // uncompleted file
-        truncatedPosition = reader.selfCheck(knownSchemas, chunkMetadataListMap, true);
+        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
         totalChunkNum = reader.getTotalChunkNum();
         if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
@@ -168,8 +169,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
   public List<ChunkMetadata> getVisibleMetadataList(String deviceId, String measurementId,
       TSDataType dataType) {
     List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-    if (metadatas.containsKey(deviceId) && metadatas.get(deviceId).containsKey(measurementId)) {
-      for (ChunkMetadata chunkMetaData : metadatas.get(deviceId).get(measurementId)) {
+    if (metadatasForQuery.containsKey(deviceId) && metadatasForQuery.get(deviceId).containsKey(measurementId)) {
+      for (ChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) {
         // filter: if adevice'sensor is defined as float type, and data has been persistent.
         // Then someone deletes the timeseries and recreate it with Int type. We have to ignore
         // all the stale data.
@@ -181,26 +182,30 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
     return chunkMetadataList;
   }
 
+  public Map<String, Map<String, List<ChunkMetadata>>> getMetadatasForQuery() {
+    return metadatasForQuery;
+  }
+
   /**
    * add all appendChunkMetadatas into memory. After calling this method, other classes can
    * read these metadata.
    */
 
   public void makeMetadataVisible() {
-    List<Pair<String, List<ChunkMetadata>>> newlyFlushedMetadataList = getAppendedRowMetadata();
+    List<ChunkGroupMetadata> newlyFlushedMetadataList = getAppendedRowMetadata();
     if (!newlyFlushedMetadataList.isEmpty()) {
-      for (Pair<String, List<ChunkMetadata>> pair : newlyFlushedMetadataList) {
-        List<ChunkMetadata> rowMetaDataList = pair.right;
-        String deviceId = pair.left;
+      for (ChunkGroupMetadata chunkGroupMetadata : newlyFlushedMetadataList) {
+        List<ChunkMetadata> rowMetaDataList = chunkGroupMetadata.getChunkMetadataList();
+        String device = chunkGroupMetadata.getDevice();
         for (ChunkMetadata chunkMetaData : rowMetaDataList) {
           String measurementId = chunkMetaData.getMeasurementUid();
-          if (!metadatas.containsKey(deviceId)) {
-            metadatas.put(deviceId, new HashMap<>());
+          if (!metadatasForQuery.containsKey(device)) {
+            metadatasForQuery.put(device, new HashMap<>());
           }
-          if (!metadatas.get(deviceId).containsKey(measurementId)) {
-            metadatas.get(deviceId).put(measurementId, new ArrayList<>());
+          if (!metadatasForQuery.get(device).containsKey(measurementId)) {
+            metadatasForQuery.get(device).put(measurementId, new ArrayList<>());
           }
-          metadatas.get(deviceId).get(measurementId).add(chunkMetaData);
+          metadatasForQuery.get(device).get(measurementId).add(chunkMetaData);
         }
       }
     }
@@ -216,12 +221,12 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
    *
    * @return a list of Device ChunkMetadataList Pair
    */
-  private List<Pair<String, List<ChunkMetadata>>> getAppendedRowMetadata() {
-    List<Pair<String, List<ChunkMetadata>>> append = new ArrayList<>();
-    if (lastFlushedChunkGroupIndex < chunkGroupInfoList.size()) {
-      append.addAll(chunkGroupInfoList
-          .subList(lastFlushedChunkGroupIndex, chunkGroupInfoList.size()));
-      lastFlushedChunkGroupIndex = chunkGroupInfoList.size();
+  private List<ChunkGroupMetadata> getAppendedRowMetadata() {
+    List<ChunkGroupMetadata> append = new ArrayList<>();
+    if (lastFlushedChunkGroupIndex < chunkGroupMetadataList.size()) {
+      append.addAll(chunkGroupMetadataList
+          .subList(lastFlushedChunkGroupIndex, chunkGroupMetadataList.size()));
+      lastFlushedChunkGroupIndex = chunkGroupMetadataList.size();
     }
     return append;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0339f5b..a8146c8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,11 +18,13 @@
  */
 package org.apache.iotdb.tsfile.write.writer;
 
+import java.util.Iterator;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
@@ -39,7 +41,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,7 +51,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * TSFileIOWriter is used to construct metadata and write data stored in memory to output stream.
+ * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
  */
 public class TsFileIOWriter {
 
@@ -65,16 +66,21 @@ public class TsFileIOWriter {
   }
 
   protected TsFileOutput out;
-  protected List<Pair<String, List<ChunkMetadata>>> chunkGroupInfoList = new ArrayList<>();
   protected boolean canWrite = true;
   protected int totalChunkNum = 0;
   protected int invalidChunkNum;
   protected File file;
-  protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-  protected Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+
+  // current flushed Chunk
   private ChunkMetadata currentChunkMetadata;
+  // current flushed ChunkGroup
+  protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
+  // all flushed ChunkGroups
+  protected List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
+
   private long markedPosition;
-  private String deviceId;
+  private String currentChunkGroupDeviceId;
   private long currentChunkGroupStartOffset;
   private List<Pair<Long, Long>> versionInfo = new ArrayList<>();
 
@@ -127,7 +133,7 @@ public class TsFileIOWriter {
   }
 
   public void startChunkGroup(String deviceId) throws IOException {
-    this.deviceId = deviceId;
+    this.currentChunkGroupDeviceId = deviceId;
     currentChunkGroupStartOffset = out.getPosition();
     logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
     chunkMetadataList = new ArrayList<>();
@@ -137,16 +143,16 @@ public class TsFileIOWriter {
    * end chunk and write some log. If there is no data in the chunk group, nothing will be flushed.
    */
   public void endChunkGroup() throws IOException {
-    if (deviceId == null || chunkMetadataList.isEmpty()) {
+    if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
       return;
     }
     long dataSize = out.getPosition() - currentChunkGroupStartOffset;
-    ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(deviceId, dataSize,
+    ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupDeviceId, dataSize,
         chunkMetadataList.size());
     chunkGroupFooter.serializeTo(out.wrapAsStream());
-    chunkGroupInfoList.add(new Pair<>(deviceId, chunkMetadataList));
+    chunkGroupMetadataList.add(new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
     logger.debug("end chunk group:{}", chunkMetadataList);
-    deviceId = null;
+    currentChunkGroupDeviceId = null;
     chunkMetadataList = null;
   }
 
@@ -204,8 +210,6 @@ public class TsFileIOWriter {
    */
   public void endCurrentChunk() {
     chunkMetadataList.add(currentChunkMetadata);
-    Path path = new Path(deviceId, currentChunkMetadata.getMeasurementUid());
-    chunkMetadataListMap.computeIfAbsent(path, k -> new ArrayList<>()).add(currentChunkMetadata);
     currentChunkMetadata = null;
     totalChunkNum++;
   }
@@ -219,10 +223,17 @@ public class TsFileIOWriter {
 
     // serialize the SEPARATOR of MetaData
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
-    
-    logger.debug("get time series list:{}", chunkMetadataListMap.keySet());
-    
-    Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList();
+
+    // group ChunkMetadata by series
+    Map<Path, List<ChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+    for (ChunkGroupMetadata chunkGroupMetadata: chunkGroupMetadataList) {
+      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
+        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+      }
+    }
+
+    Map<String, Pair<Long, Integer>> deviceMetaDataMap = flushAllChunkMetadataList(chunkMetadataListMap);
     
     TsFileMetadata tsFileMetaData = new TsFileMetadata();
     tsFileMetaData.setDeviceMetadataIndex(deviceMetaDataMap);
@@ -231,7 +242,9 @@ public class TsFileIOWriter {
     tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
 
     long footerIndex = out.getPosition();
-    logger.debug("start to flush the footer,file pos:{}", footerIndex);
+    if (logger.isDebugEnabled()) {
+      logger.debug("start to flush the footer,file pos:{}", footerIndex);
+    }
 
     // write TsFileMetaData
     int size = tsFileMetaData.serializeTo(out.wrapAsStream());
@@ -253,19 +266,18 @@ public class TsFileIOWriter {
 
     // close file
     out.close();
-    if (resourceLogger.isInfoEnabled() && file != null) {
-      resourceLogger.info("{} writer is closed.", file.getName());
+    if (resourceLogger.isDebugEnabled() && file != null) {
+      resourceLogger.debug("{} writer is closed.", file.getName());
     }
     canWrite = false;
-    chunkMetadataListMap = new TreeMap<>();
-    logger.info("output stream is closed");
   }
 
   /**
    * Flush ChunkMetadataList and TimeseriesMetaData
    * @return DeviceMetaDataMap in TsFileMetaData
    */
-  private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList() throws IOException {
+  private Map<String, Pair<Long, Integer>> flushAllChunkMetadataList(
+      Map<Path, List<ChunkMetadata>> chunkMetadataListMap) throws IOException {
 
     // convert ChunkMetadataList to this field
     Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new LinkedHashMap<>();
@@ -304,7 +316,7 @@ public class TsFileIOWriter {
         size += timeseriesMetaData.serializeTo(out.wrapAsStream());
       }
       deviceMetadataMap
-          .put(device, new Pair<Long, Integer>(offsetOfFirstTimeseriesMetaDataInDevice, size));
+          .put(device, new Pair<>(offsetOfFirstTimeseriesMetaDataInDevice, size));
     }
     // return
     return deviceMetadataMap;
@@ -323,11 +335,10 @@ public class TsFileIOWriter {
   // device -> ChunkMetadataList
   public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() {
     Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
-    for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
-      Path path = entry.getKey();
-      String device = path.getDevice();
-      deviceChunkMetadataMap.computeIfAbsent(device, k -> new ArrayList<>())
-          .addAll(entry.getValue());
+
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
+          .addAll(chunkGroupMetadata.getChunkMetadataList());
     }
     return deviceChunkMetadataMap;
   }
@@ -376,22 +387,27 @@ public class TsFileIOWriter {
   /**
    * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
    */
-
   public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
     Map<Path, Integer> startTimeIdxes = new HashMap<>();
     chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
 
-    for (Map.Entry<Path, List<ChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
-      List<ChunkMetadata> chunkMetadatas = entry.getValue();
-      Path path = entry.getKey();
-      int chunkNum = chunkMetadatas.size();
-      for (ChunkMetadata chunkMetaData : chunkMetadatas) {
+    Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
+    while (chunkGroupMetaDataIterator.hasNext()) {
+      ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+      String deviceId = chunkGroupMetaData.getDevice();
+      int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
+      Iterator<ChunkMetadata> chunkMetaDataIterator = chunkGroupMetaData.getChunkMetadataList()
+          .iterator();
+      while (chunkMetaDataIterator.hasNext()) {
+        ChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
+        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
         int startTimeIdx = startTimeIdxes.get(path);
+
         List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
         boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
             && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
         if (!chunkValid) {
-          chunkMetadatas.remove(chunkMetaData);
+          chunkMetaDataIterator.remove();
           chunkNum--;
           invalidChunkNum++;
         } else {
@@ -399,7 +415,7 @@ public class TsFileIOWriter {
         }
       }
       if (chunkNum == 0) {
-        chunkMetadataListMap.remove(path);
+        chunkGroupMetaDataIterator.remove();
       }
     }
   }