You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/11 07:10:06 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: change bufferwrite flush to asychronous and add MemTablePool to manage all memtables

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 4a76448  change bufferwrite flush to asychronous and add MemTablePool to manage all memtables
4a76448 is described below

commit 4a7644852fb183d1ea11b813f95b6cc7aaab3d0b
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 11 15:09:31 2019 +0800

    change bufferwrite flush to asychronous and add MemTablePool to manage all memtables
---
 .../engine/bufferwrite/BufferWriteProcessor.java   |  54 +++++--
 .../bufferwrite/RestorableTsFileIOWriter.java      |  16 +-
 .../db/engine/filenode/FileNodeProcessor.java      |  27 ++--
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   6 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  12 ++
 .../db/engine/memtable/MemTableFlushCallBack.java  |   9 ++
 .../db/engine/memtable/MemTableFlushTask.java      | 180 +++++++++++----------
 .../db/engine/memtable/MemTableFlushUtil.java      |   4 +-
 .../iotdb/db/engine/memtable/MemTablePool.java     |  59 +++++++
 .../db/engine/memtable/PrimitiveMemTable.java      |   5 +
 .../db/query/reader/merge/PriorityMergeReader.java |   3 -
 .../apache/iotdb/db/service/CloseMergeService.java |   2 +-
 .../bufferwrite/RestorableTsFileIOWriterTest.java  |   4 +-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |   9 +-
 .../write/writer/NativeRestorableIOWriter.java     |   2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  65 +++++---
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   3 +-
 17 files changed, 299 insertions(+), 161 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index d6b7ccc..b331a57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.engine.bufferwrite;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -42,7 +43,7 @@ import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
+import org.apache.iotdb.db.engine.memtable.MemTablePool;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -53,13 +54,14 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,9 +74,14 @@ public class BufferWriteProcessor extends Processor {
   private volatile Future<Boolean> closeFuture = new BWCloseFuture(new ImmediateFuture<>(true));
   private ReentrantLock flushQueryLock = new ReentrantLock();
   private AtomicLong memSize = new AtomicLong();
-  private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
+  private long memThreshold = TSFileConfig.groupSizeInByte;
   private IMemTable workMemTable;
   private IMemTable flushMemTable;
+
+  // each flush task has a flushId, IO task should scheduled by this id
+  private long flushId = -1;
+  private List<IMemTable> flushingMemTables = new ArrayList<>();
+
   private Action bufferwriteFlushAction;
   //private Action bufferwriteCloseAction;
   private Consumer<BufferWriteProcessor> bufferwriteCloseConsumer;
@@ -153,7 +160,7 @@ public class BufferWriteProcessor extends Processor {
       throw new BufferWriteProcessorException(e);
     }
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable();
+      workMemTable = MemTablePool.getInstance().getEmptyMemTable();
     } else {
       workMemTable.clear();
     }
@@ -290,7 +297,7 @@ public class BufferWriteProcessor extends Processor {
     flushQueryLock.lock();
     LOGGER.info("BufferWrite Processor {} get flushQueryLock for switchWorkToFlush successfully", getProcessorName());
     try {
-      IMemTable temp = flushMemTable == null ? new PrimitiveMemTable() : flushMemTable;
+      IMemTable temp = flushMemTable == null ? MemTablePool.getInstance().getEmptyMemTable() : flushMemTable;
       flushMemTable = workMemTable;
       workMemTable = temp;
       isFlush = true;
@@ -314,26 +321,40 @@ public class BufferWriteProcessor extends Processor {
     }
   }
 
+  private void removeFlushedMemTable(IMemTable memTable, TsFileIOWriter tsFileIOWriter) {
+    this.writeLock();
+    tsFileIOWriter.mergeChunkGroupMetaData();
+    try {
+      flushingMemTables.remove(memTable);
+    } finally {
+      this.writeUnlock();
+    }
+  }
+
 
   /**
    * the caller mast guarantee no other concurrent caller entering this function.
    *
    * @param displayMessage message that will appear in system log.
+   * @param tmpMemTableToFlush
    * @param version the operation version that will tagged on the to be flushed memtable
    * (i.e., ChunkGroup)
    * @param walTaskId used for declaring what the wal file name suffix is.
    * @return true if successfully.
    */
-  private boolean flushTask(String displayMessage, long version, long walTaskId) {
+  private boolean flushTask(String displayMessage,
+      IMemTable tmpMemTableToFlush, long version,
+      long walTaskId) {
     boolean result;
     long flushStartTime = System.currentTimeMillis();
     LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
         displayMessage);
     try {
-      if (flushMemTable != null && !flushMemTable.isEmpty()) {
+      if (tmpMemTableToFlush != null && !tmpMemTableToFlush.isEmpty()) {
         // flush data
-        MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName());
-        tableFlushTask.flushMemTable(fileSchema, flushMemTable, version);
+        MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, getProcessorName(), flushId,
+            this::removeFlushedMemTable);
+        tableFlushTask.flushMemTable(fileSchema, tmpMemTableToFlush, version);
         // write restore information
         writer.flush();
       }
@@ -419,7 +440,14 @@ public class BufferWriteProcessor extends Processor {
         walTaskId = 0;
       }
       valueCount = 0;
-      switchWorkToFlush();
+
+      synchronized (flushingMemTables) {
+        flushingMemTables.add(workMemTable);
+      }
+      IMemTable tmpMemTableToFlush = workMemTable;
+      workMemTable = MemTablePool.getInstance().getEmptyMemTable();
+
+      flushId++;
       long version = versionController.nextVersion();
       BasicMemController.getInstance().releaseUsage(this, memSize.get());
       memSize.set(0);
@@ -429,7 +457,7 @@ public class BufferWriteProcessor extends Processor {
             "flush memtable for bufferwrite processor {} synchronously for close task.",
             getProcessorName(), FlushManager.getInstance().getWaitingTasksNumber(),
             FlushManager.getInstance().getCorePoolSize());
-        flushTask("synchronously", version, walTaskId);
+        flushTask("synchronously", tmpMemTableToFlush, version, walTaskId);
         flushFuture = new ImmediateFuture<>(true);
       } else {
         if (LOGGER.isInfoEnabled()) {
@@ -439,7 +467,7 @@ public class BufferWriteProcessor extends Processor {
               FlushManager.getInstance().getCorePoolSize());
         }
         flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
-            version, walTaskId));
+            tmpMemTableToFlush, version, walTaskId));
       }
     } else {
       flushFuture = new ImmediateFuture<>(true);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
index a7542b5..5362636 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriter.java
@@ -100,8 +100,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
       // cut off tsfile
       this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile, true));
       out.truncate(position);
-      this.chunkGroupMetaDataList = existedMetadatas;
-      lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+      this.flushedChunkGroupMetaDataList = existedMetadatas;
+      lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
       append = new ArrayList<>();
       // recovery the metadata
       recoverMetadata(existedMetadatas);
@@ -122,8 +122,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
         LOGGER.info("remove unsealed tsfile restore file failed: ", e);
       }
       this.out = new DefaultTsFileOutput(new FileOutputStream(insertFile));
-      this.chunkGroupMetaDataList = new ArrayList<>();
-      lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+      this.flushedChunkGroupMetaDataList = new ArrayList<>();
+      lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
       append = new ArrayList<>();
       startFile();
       isNewResource = true;
@@ -298,11 +298,11 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
    * @return a list of chunkgroup metadata
    */
   private List<ChunkGroupMetaData> getAppendedRowGroupMetadata() {
-    if (lastFlushedChunkGroupIndex < chunkGroupMetaDataList.size()) {
+    if (lastFlushedChunkGroupIndex < flushedChunkGroupMetaDataList.size()) {
       append.clear();
-      append.addAll(chunkGroupMetaDataList
-          .subList(lastFlushedChunkGroupIndex, chunkGroupMetaDataList.size()));
-      lastFlushedChunkGroupIndex = chunkGroupMetaDataList.size();
+      append.addAll(flushedChunkGroupMetaDataList
+          .subList(lastFlushedChunkGroupIndex, flushedChunkGroupMetaDataList.size()));
+      lastFlushedChunkGroupIndex = flushedChunkGroupMetaDataList.size();
     }
     return append;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index ded5594..78b41ae 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -33,17 +33,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -51,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
-import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -181,7 +176,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
   private Map<String, Action> parameters;
   private FileSchema fileSchema;
-  private Action flushFileNodeProcessorAction = () -> {
+  private Action fileNodeFlushAction = () -> {
     synchronized (fileNodeProcessorStore) {
       try {
         writeStoreToDisk(fileNodeProcessorStore);
@@ -403,7 +398,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   void addIntervalFileNode(TsFileResource tsFileResource) throws ActionException {
     newFileNodes.add(tsFileResource);
     fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-    flushFileNodeProcessorAction.act();
+    fileNodeFlushAction.act();
   }
 
   /**
@@ -496,7 +491,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
           //parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
           parameters
-              .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+              .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
           String baseDir = directories
               .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
           if (LOGGER.isInfoEnabled()) {
@@ -524,7 +519,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     LOGGER.info("The filenode processor {} will recovery the overflow processor.",
         getProcessorName());
     parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
-    parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+    parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
     try {
       overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
           versionController);
@@ -571,7 +566,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       params.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
       //params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
       params
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
       String baseDir = directories.getNextFolderForTsfile();
       LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
       // construct processor or restore
@@ -605,7 +600,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // construct processor or restore
       params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
       params
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
       overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
           versionController);
     } else if (overflowProcessor.isClosed()) {
@@ -922,7 +917,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       bufferwriteFlushAction.act();
       fileNodeProcessorStore.setNewFileNodes(newFileNodes);
       // reconstruct the inverted index of the newFileNodes
-      flushFileNodeProcessorAction.act();
+      fileNodeFlushAction.act();
       addAllFileIntoIndex(newFileNodes);
     } catch (Exception e) {
       LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", appendFile,
@@ -1575,9 +1570,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         }
         if (mergeIsChunkGroupHasData) {
           // end the new rowGroupMetadata
-          long size = mergeFileWriter.getPos() - mergeStartPos;
-          footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
-          mergeFileWriter.endChunkGroup(footer, 0);
+          mergeFileWriter.endChunkGroup(0);
         }
       }
     } finally {
@@ -2089,7 +2082,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         Objects.equals(newMultiPassCount, that.newMultiPassCount) &&
         Objects.equals(parameters, that.parameters) &&
         Objects.equals(fileSchema, that.fileSchema) &&
-        Objects.equals(flushFileNodeProcessorAction, that.flushFileNodeProcessorAction) &&
+        Objects.equals(fileNodeFlushAction, that.fileNodeFlushAction) &&
         Objects.equals(bufferwriteFlushAction, that.bufferwriteFlushAction) &&
         Objects.equals(overflowFlushAction, that.overflowFlushAction);
   }
@@ -2102,7 +2095,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
         lastMergeTime, bufferWriteProcessor, overflowProcessor, oldMultiPassTokenSet,
         newMultiPassTokenSet, oldMultiPassCount, newMultiPassCount, shouldRecovery, parameters,
-        fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
+        fileSchema, fileNodeFlushAction, bufferwriteFlushAction,
         overflowFlushAction, multiPassLockToken);
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 097f7f7..7e57a6b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -174,4 +174,10 @@ public abstract class AbstractMemTable implements IMemTable {
     }
     return null;
   }
+
+  @Override
+  public boolean containSeries(String deviceId, String measurementId) {
+    Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
+    return deviceMap != null && deviceMap.containsKey(measurementId);
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 53ea2bc..bca8d05 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 
 /**
  * IMemTable is designed to store data points which are not flushed into TsFile yet. An instance of
@@ -42,6 +44,14 @@ public interface IMemTable {
 
   int size();
 
+  default void insert(TSRecord tsRecord) {
+    for (DataPoint dataPoint : tsRecord.dataPointList) {
+      write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
+          tsRecord.time,
+          dataPoint.getValue().toString());
+    }
+  }
+
   ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
       Map<String, String> props);
 
@@ -69,4 +79,6 @@ public interface IMemTable {
    * @return a MemTable with the same data as this one.
    */
   IMemTable copy();
+
+  boolean containSeries(String deviceId, String measurementId);
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
new file mode 100644
index 0000000..d5e2ae5
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushCallBack.java
@@ -0,0 +1,9 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+@FunctionalInterface
+public interface MemTableFlushCallBack {
+
+  void afterFlush(IMemTable memTable, TsFileIOWriter tsFileIOWriter);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
index 7b28992..27926c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java
@@ -1,19 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied.  See the License for the specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.iotdb.db.engine.memtable;
@@ -49,63 +45,38 @@ public class MemTableFlushTask {
   private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
   private boolean stop = false;
   private String processName;
+  private long flushId;
 
-  //the position of the tsfile when a flush task begins.
-  long startPos;
+  private MemTableFlushCallBack flushCallBack;
+  private IMemTable memTable;
+
+  public MemTableFlushTask(TsFileIOWriter writer, String processName, long flushId,
+      MemTableFlushCallBack callBack) {
+    this.tsFileIoWriter = writer;
+    this.processName = processName;
+    this.flushId = flushId;
+    this.flushCallBack = callBack;
+    ioFlushThread.start();
+    memoryFlushThread.start();
+  }
 
-  //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
-  // rather than per each memtable.
-  private Thread ioFlushThread = new Thread(() -> {
-    long ioTime = 0;
-    while (!stop) {
-      Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
-      if (seriesWriterOrEndChunkGroupTask == null ) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
-        }
-      } else {
-        long starTime = System.currentTimeMillis();
-        try {
-          if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
-            ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
-          } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
-            tsFileIoWriter.startFlushChunkGroup((String)seriesWriterOrEndChunkGroupTask);
-          } else {
-            ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
-            long memSize = tsFileIoWriter.getPos() - startPos;
-            ChunkGroupFooter footer = new ChunkGroupFooter(task.deviceId, memSize, task.seriesNumber);
-            tsFileIoWriter.endChunkGroup(footer, task.version);
-            startPos = tsFileIoWriter.getPos();
-            task.finished = true;
-          }
-        } catch (IOException e) {
-          LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
-          throw new RuntimeException(e);
-        }
-        ioTime += System.currentTimeMillis() - starTime;
-      }
-    }
-    LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk:  io cost {} ms.", processName, ioTime );
-  });
 
   private Thread memoryFlushThread = new Thread(() -> {
     long memSerializeTime = 0;
     while (!stop) {
       Object task = memoryTaskQueue.poll();
-      if (task == null ) {
+      if (task == null) {
         try {
           Thread.sleep(10);
         } catch (InterruptedException e) {
           LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
         }
       } else {
-        if (task instanceof  String) {
+        if (task instanceof String) {
           ioTaskQueue.add(task);
         } else if (task instanceof ChunkGroupIoTask) {
           ioTaskQueue.add(task);
-        }else {
+        } else {
           long starTime = System.currentTimeMillis();
           Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
           ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
@@ -123,48 +94,91 @@ public class MemTableFlushTask {
         }
       }
     }
-    LOGGER.info("BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.", processName, memSerializeTime );
+    LOGGER.info(
+        "BufferWrite Processor {},flushing a memtable into disk: serialize data into mem cost {} ms.",
+        processName, memSerializeTime);
+  });
+
+
+  //TODO more better way is: for each TsFile, assign it a Executors.singleThreadPool,
+  // rather than per each memtable.
+  private Thread ioFlushThread = new Thread(() -> {
+    long ioTime = 0;
+    while (tsFileIoWriter.getFlushID().get() != flushId) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        LOGGER.error("Processor {}, last flush io task is not finished.", processName, e);
+      }
+    }
+    while (!stop) {
+      Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
+      if (seriesWriterOrEndChunkGroupTask == null) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOGGER.error("BufferWrite Processor {}, io flush task is interrupted.", processName, e);
+        }
+      } else {
+        long starTime = System.currentTimeMillis();
+        try {
+          if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
+            ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
+          } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
+            tsFileIoWriter.startFlushChunkGroup((String) seriesWriterOrEndChunkGroupTask);
+          } else {
+            ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
+            tsFileIoWriter.endChunkGroup(task.version);
+            task.finished = true;
+          }
+        } catch (IOException e) {
+          LOGGER.error("BufferWrite Processor {}, io error.", processName, e);
+          throw new RuntimeException(e);
+        }
+        ioTime += System.currentTimeMillis() - starTime;
+      }
+    }
+    flushCallBack.afterFlush(memTable, tsFileIoWriter);
+    MemTablePool.getInstance().release(memTable);
+    tsFileIoWriter.getFlushID().getAndIncrement();
+    LOGGER.info("BufferWrite Processor {}, flushing a memtable into disk:  io cost {} ms.",
+        processName, ioTime);
   });
 
-  public MemTableFlushTask(TsFileIOWriter writer, String processName) {
-    this.tsFileIoWriter = writer;
-    this.processName = processName;
-    ioFlushThread.start();
-    memoryFlushThread.start();
-  }
 
-  private  void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+  private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
       TSDataType dataType)
       throws IOException {
     for (TimeValuePair timeValuePair : tvPairs) {
       switch (dataType) {
         case BOOLEAN:
-            seriesWriterImpl
-                    .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+          seriesWriterImpl
+              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
           break;
         case INT32:
-            seriesWriterImpl.write(timeValuePair.getTimestamp(),
-                    timeValuePair.getValue().getInt());
+          seriesWriterImpl.write(timeValuePair.getTimestamp(),
+              timeValuePair.getValue().getInt());
           break;
         case INT64:
-            seriesWriterImpl.write(timeValuePair.getTimestamp(),
-                    timeValuePair.getValue().getLong());
+          seriesWriterImpl.write(timeValuePair.getTimestamp(),
+              timeValuePair.getValue().getLong());
           break;
         case FLOAT:
-            seriesWriterImpl.write(timeValuePair.getTimestamp(),
-                    timeValuePair.getValue().getFloat());
+          seriesWriterImpl.write(timeValuePair.getTimestamp(),
+              timeValuePair.getValue().getFloat());
           break;
         case DOUBLE:
-            seriesWriterImpl
-                    .write(timeValuePair.getTimestamp(),
-                            timeValuePair.getValue().getDouble());
+          seriesWriterImpl
+              .write(timeValuePair.getTimestamp(),
+                  timeValuePair.getValue().getDouble());
           break;
         case TEXT:
-            seriesWriterImpl
-                    .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+          seriesWriterImpl
+              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
           break;
         default:
-          LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName, dataType);
+          LOGGER.error("BufferWrite Processor {}, don't support data type: {}", processName,
+              dataType);
           break;
       }
     }
@@ -173,11 +187,10 @@ public class MemTableFlushTask {
   /**
    * the function for flushing memtable.
    */
-  public  void flushMemTable(FileSchema fileSchema,
-      IMemTable imemTable, long version) throws IOException {
+  public void flushMemTable(FileSchema fileSchema, IMemTable imemTable, long version) {
     long sortTime = 0;
-    startPos = tsFileIoWriter.getPos();
     ChunkGroupIoTask theLastTask = EMPTY_TASK;
+    this.memTable = imemTable;
     for (String deviceId : imemTable.getMemTableMap().keySet()) {
       memoryTaskQueue.add(deviceId);
       int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
@@ -194,12 +207,14 @@ public class MemTableFlushTask {
       memoryTaskQueue.add(theLastTask);
     }
     LOGGER.info(
-        "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.", processName, sortTime );
+        "BufferWrite Processor {}, flushing a memtable into disk: data sort time cost {} ms.",
+        processName, sortTime);
     while (!theLastTask.finished) {
       try {
         Thread.sleep(10);
       } catch (InterruptedException e) {
-        LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.", processName, e);
+        LOGGER.error("BufferWrite Processor {}, flush memtable table thread is interrupted.",
+            processName, e);
         throw new RuntimeException(e);
       }
     }
@@ -208,6 +223,7 @@ public class MemTableFlushTask {
 
 
   static class ChunkGroupIoTask {
+
     int seriesNumber;
     String deviceId;
     long version;
@@ -216,6 +232,7 @@ public class MemTableFlushTask {
     public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
       this(seriesNumber, deviceId, version, false);
     }
+
     public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
       this.seriesNumber = seriesNumber;
       this.deviceId = deviceId;
@@ -223,6 +240,7 @@ public class MemTableFlushTask {
       this.finished = finished;
     }
   }
+
   private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
 
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 4aa12e9..e9a24c7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -110,9 +110,7 @@ public class MemTableFlushUtil {
         ioTime += System.currentTimeMillis() - startTime;
       }
       tmpTime = System.currentTimeMillis();
-      long memSize = tsFileIoWriter.getPos() - startPos;
-      ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, memSize, seriesNumber);
-      tsFileIoWriter.endChunkGroup(footer, version);
+      tsFileIoWriter.endChunkGroup(version);
       ioTime += System.currentTimeMillis() - tmpTime;
     }
     LOGGER.info(
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
new file mode 100644
index 0000000..1fb4a64
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -0,0 +1,59 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import java.util.Stack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemTablePool {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
+
+  private Stack<IMemTable> emptyMemTables;
+  private int capacity = 10;
+  private int size = 0;
+
+  private static final MemTablePool INSTANCE = new MemTablePool();
+
+  public MemTablePool() {
+    emptyMemTables = new Stack<>();
+  }
+
+  public IMemTable getEmptyMemTable() {
+    synchronized (emptyMemTables) {
+      if (emptyMemTables.isEmpty() && size < capacity) {
+        size++;
+        return new PrimitiveMemTable();
+      } else if (!emptyMemTables.isEmpty()){
+        return emptyMemTables.pop();
+      }
+    }
+    // wait until some one has released a memtable
+    while (true) {
+      if(!emptyMemTables.isEmpty()) {
+        synchronized (emptyMemTables) {
+          if (!emptyMemTables.isEmpty()){
+            return emptyMemTables.pop();
+          }
+        }
+      }
+      try {
+        Thread.sleep(20);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.error("Unexpected interruption", e);
+      }
+    }
+  }
+
+
+  public void release(IMemTable memTable) {
+    synchronized (emptyMemTables) {
+      memTable.clear();
+      emptyMemTables.push(memTable);
+    }
+  }
+
+  public static MemTablePool getInstance() {
+    return INSTANCE;
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 455196a..eac3323 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -44,4 +44,9 @@ public class PrimitiveMemTable extends AbstractMemTable {
 
     return new PrimitiveMemTable(newMap);
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    return this == obj;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
index e76d142..251bd9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/merge/PriorityMergeReader.java
@@ -33,9 +33,6 @@ import org.apache.iotdb.db.utils.TimeValuePair;
  */
 public class PriorityMergeReader implements IPointReader {
 
-  public static final int LOW_PRIORITY = 1;
-  public static final int HIGH_PRIORITY = 2;
-
   private List<IPointReader> readerList = new ArrayList<>();
   private List<Integer> priorityList = new ArrayList<>();
   private PriorityQueue<Element> heap = new PriorityQueue<>();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
index 4131396..0e7cb8a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/CloseMergeService.java
@@ -143,7 +143,7 @@ public class CloseMergeService implements IService {
     @Override
     public void run() {
       service.scheduleWithFixedDelay(mergeService, MERGE_DELAY, MERGE_PERIOD, TimeUnit.SECONDS);
-      service.scheduleWithFixedDelay(closeService, CLOSE_DELAY, CLOSE_PERIOD, TimeUnit.SECONDS);
+//      service.scheduleWithFixedDelay(closeService, CLOSE_DELAY, CLOSE_PERIOD, TimeUnit.SECONDS);
       while (!service.isShutdown()) {
         synchronized (service) {
           try {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 1f8e3e2..3c239c8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -167,7 +167,7 @@ public class RestorableTsFileIOWriterTest {
     memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
     memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
     //MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
-    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test", 0L, (a,b) -> {});
     tableFlushTask.flushMemTable(schema, memTable, 0);
     writer.flush();
     writer.appendMetadata();
@@ -222,7 +222,7 @@ public class RestorableTsFileIOWriterTest {
         MemTableTestUtils.dataType0);
 
     //MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
-    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test");
+    MemTableFlushTask tableFlushTask = new MemTableFlushTask(writer, "test", 0L, (a,b) -> {});
     tableFlushTask.flushMemTable(MemTableTestUtils.getFileSchema(), memTable, 0);
 
     writer.flush();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index b201324..52cfdf9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -74,11 +74,6 @@ public class TsFileWriter implements AutoCloseable{
    **/
   private long recordCountForNextMemCheck = 100;
   private long chunkGroupSizeThreshold;
-  /**
-   * In an individual TsFile, version number is not meaningful, added
-   * only for tests.
-   */
-  private long version = 0;
 
   /**
    * init this TsFileWriter.
@@ -295,8 +290,7 @@ public class TsFileWriter implements AutoCloseable{
               "Flushed data size is inconsistent with computation! Estimated: %d, Actuall: %d",
               chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos));
         }
-
-        fileWriter.endChunkGroup(chunkGroupFooter, version++);
+        fileWriter.endChunkGroup(0);
       }
       long actualTotalChunkGroupSize = fileWriter.getPos() - totalMemStart;
       LOG.info("total chunk group size:{}", actualTotalChunkGroupSize);
@@ -317,6 +311,7 @@ public class TsFileWriter implements AutoCloseable{
    *
    * @throws IOException exception in IO
    */
+  @Override
   public void close() throws IOException {
     LOG.info("start close file");
     flushAllChunkGroups();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index 2496d42..6fb4ee5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -68,7 +68,7 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
           out.close();
           return;
         }
-        truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, !append);
+        truncatedPosition = reader.selfCheck(knownSchemas, flushedChunkGroupMetaDataList, !append);
         if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE && !append) {
             this.canWrite = false;
             out.close();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index c83c8e8..75301e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -65,7 +66,9 @@ public class TsFileIOWriter {
   }
 
   protected TsFileOutput out;
-  protected List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
+  private AtomicLong flushID = new AtomicLong(0);
+  protected List<ChunkGroupMetaData> flushedChunkGroupMetaDataList = new ArrayList<>();
+  protected List<ChunkGroupMetaData> flushingChunkGroupMetaDataList = new ArrayList<>();
   private ChunkGroupMetaData currentChunkGroupMetaData;
   private ChunkMetaData currentChunkMetaData;
   protected boolean canWrite = true;
@@ -104,14 +107,14 @@ public class TsFileIOWriter {
    * data in the TsFileOutput matches the given metadata list
    *
    * @param out the target output
-   * @param chunkGroupMetaDataList existing chunkgroups' metadata
+   * @param flushedChunkGroupMetaDataList existing chunkgroups' metadata
    * @throws IOException if I/O error occurs
    */
-  public TsFileIOWriter(TsFileOutput out, List<ChunkGroupMetaData> chunkGroupMetaDataList)
+  public TsFileIOWriter(TsFileOutput out, List<ChunkGroupMetaData> flushedChunkGroupMetaDataList)
       throws IOException {
     this.out = out;
-    this.chunkGroupMetaDataList = chunkGroupMetaDataList;
-    if (chunkGroupMetaDataList.isEmpty()) {
+    this.flushedChunkGroupMetaDataList = flushedChunkGroupMetaDataList;
+    if (flushedChunkGroupMetaDataList.isEmpty()) {
       startFile();
     }
   }
@@ -131,6 +134,19 @@ public class TsFileIOWriter {
     out.write(magicStringBytes);
   }
 
+  public AtomicLong getFlushID() {
+    return flushID;
+  }
+
+  /**
+   * move ChunkGroupMetadata from flushingChunkGroupMetaDataList to flushedChunkGroupMetaDataList
+   * only flushedChunkGroupMetaDataList is visible for query
+   */
+  public void mergeChunkGroupMetaData() {
+    flushedChunkGroupMetaDataList.addAll(flushingChunkGroupMetaDataList);
+    flushingChunkGroupMetaDataList.clear();
+  }
+
   /**
    * start a {@linkplain ChunkGroupMetaData ChunkGroupMetaData}.
    *
@@ -142,6 +158,21 @@ public class TsFileIOWriter {
   }
 
   /**
+   * end chunk and write some log.
+   */
+  public void endChunkGroup(long version) throws IOException {
+    long dataSize = out.getPosition() - currentChunkGroupMetaData.getStartOffsetOfChunkGroup();
+    ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter(currentChunkGroupMetaData.getDeviceID(),
+        dataSize, currentChunkGroupMetaData.getChunkMetaDataList().size());
+    chunkGroupFooter.serializeTo(out.wrapAsStream());
+    currentChunkGroupMetaData.setEndOffsetOfChunkGroup(out.getPosition());
+    currentChunkGroupMetaData.setVersion(version);
+    flushingChunkGroupMetaDataList.add(currentChunkGroupMetaData);
+    LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
+    currentChunkGroupMetaData = null;
+  }
+
+  /**
    * start a {@linkplain ChunkMetaData ChunkMetaData}.
    *
    * @param descriptor - measurement of this time series
@@ -200,20 +231,6 @@ public class TsFileIOWriter {
   }
 
   /**
-   * end chunk and write some log.
-   *
-   * @param chunkGroupFooter -use to serialize
-   */
-  public void endChunkGroup(ChunkGroupFooter chunkGroupFooter, long version) throws IOException {
-    chunkGroupFooter.serializeTo(out.wrapAsStream());
-    currentChunkGroupMetaData.setEndOffsetOfChunkGroup(out.getPosition());
-    currentChunkGroupMetaData.setVersion(version);
-    chunkGroupMetaDataList.add(currentChunkGroupMetaData);
-    LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
-    currentChunkGroupMetaData = null;
-  }
-
-  /**
    * write {@linkplain TsFileMetaData TSFileMetaData} to output stream and close it.
    *
    * @param schema FileSchema
@@ -221,6 +238,8 @@ public class TsFileIOWriter {
    */
   public void endFile(FileSchema schema) throws IOException {
 
+    mergeChunkGroupMetaData();
+
     // serialize the SEPARATOR of MetaData and ChunkGroups
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
@@ -229,7 +248,7 @@ public class TsFileIOWriter {
     LOG.debug("get time series list:{}", schemaDescriptors);
 
     Map<String, TsDeviceMetadataIndex> tsDeviceMetadataIndexMap = flushTsDeviceMetaDataAndGetIndex(
-        this.chunkGroupMetaDataList);
+        this.flushedChunkGroupMetaDataList);
 
     TsFileMetaData tsFileMetaData = new TsFileMetaData(tsDeviceMetadataIndexMap, schemaDescriptors,
         TSFileConfig.CURRENT_VERSION);
@@ -254,7 +273,7 @@ public class TsFileIOWriter {
   }
 
   /**
-   * 1. group chunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
+   * 1. group flushedChunkGroupMetaDataList to TsDeviceMetadata 2. flush TsDeviceMetadata 3. get
    * TsDeviceMetadataIndex
    *
    * @param chunkGroupMetaDataList all chunk group metadata in memory
@@ -321,12 +340,12 @@ public class TsFileIOWriter {
   }
 
   /**
-   * get chunkGroupMetaDataList.
+   * get flushedChunkGroupMetaDataList.
    *
    * @return - List of chunkGroupMetaData
    */
   public List<ChunkGroupMetaData> getChunkGroupMetaDatas() {
-    return chunkGroupMetaDataList;
+    return flushedChunkGroupMetaDataList;
   }
 
   public boolean canWrite() {
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index 8c6cf00..a00e39a 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -60,8 +60,7 @@ public class TsFileIOWriterTest {
     writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor(),
         measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
     writer.endChunk(0);
-    ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1);
-    writer.endChunkGroup(footer, 0);
+    writer.endChunkGroup(0);
 
     // end file
     writer.endFile(fileSchema);