You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/27 12:07:32 UTC

[incubator-iotdb] 03/03: add TVList

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit bfc9b43f110b823761d59e4dd43f13e9246e971c
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 27 20:07:06 2019 +0800

    add TVList
---
 .../db/engine/memtable/IWritableMemChunk.java      |   7 +-
 .../db/engine/memtable/MemTableFlushTaskV2.java    |  39 ++++----
 .../db/engine/memtable/PrimitiveMemTable.java      |   2 +-
 .../db/engine/memtable/TimeValuePairSorter.java    |   1 -
 .../db/engine/memtable/WritableMemChunkV2.java     | 107 +++++++++++++++------
 .../{LongTVList.java => DoubleTVList.java}         |  39 ++++----
 .../iotdb/db/utils/datastructure/LongTVList.java   |   1 -
 .../iotdb/db/utils/datastructure/TVList.java       |   3 +-
 .../writelog/manager/MultiFileLogNodeManager.java  |   2 -
 .../db/writelog/node/ExclusiveWriteLogNode.java    |   3 -
 .../writelog/recover/TsFileRecoverPerformer.java   |   7 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   4 +-
 12 files changed, 127 insertions(+), 88 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index f17fcb7..4888b6b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
@@ -39,8 +40,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
 
   void write(long insertTime, Object insertValue);
 
-//  void reset();
-
   long count();
 
   TSDataType getType();
@@ -49,5 +48,7 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
 
   void releasePrimitiveArrayList();
 
-  DeduplicatedSortedData getDeduplicatedSortedData();
+  default DeduplicatedSortedData getDeduplicatedSortedData(){return null;}
+
+  default TVList getSortedTVList(){return null;}
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 3271041..0df3bc0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 public class MemTableFlushTaskV2 {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTaskV2.class);
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
   private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
       .getInstance();
@@ -81,9 +82,9 @@ public class MemTableFlushTaskV2 {
         // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
         IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
-        List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
+        TVList tvList = series.getSortedTVList();
         sortTime += System.currentTimeMillis() - startTime;
-        encodingTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
+        encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
       encodingTaskQueue.add(new ChunkGroupIoTask(seriesNumber, deviceId, memTable.getVersion()));
     }
@@ -133,7 +134,7 @@ public class MemTableFlushTaskV2 {
               ioTaskQueue.add(task);
             } else {
               long starTime = System.currentTimeMillis();
-              Pair<List<TimeValuePair>, MeasurementSchema> encodingMessage = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
+              Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
               ChunkBuffer chunkBuffer = new ChunkBuffer(encodingMessage.right);
               IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right, chunkBuffer,
                   PAGE_SIZE_THRESHOLD);
@@ -214,35 +215,35 @@ public class MemTableFlushTaskV2 {
   };
 
 
-  private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
+  private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
       TSDataType dataType)
       throws IOException {
-    for (TimeValuePair timeValuePair: tvPairs) {
+
+    for (int i = 0; i < tvPairs.size(); i++) {
+      long time = tvPairs.getTime(i);
+      if (time < tvPairs.getTimeOffset() ||
+          (i+1 < tvPairs.size() && (time == tvPairs.getTime(i+1)))) {
+        continue;
+      }
+
       switch (dataType) {
         case BOOLEAN:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+          seriesWriterImpl.write(time, tvPairs.getBoolean(i));
           break;
         case INT32:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getInt());
+          seriesWriterImpl.write(time, tvPairs.getInt(i));
           break;
         case INT64:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getLong());
+          seriesWriterImpl.write(time, tvPairs.getLong(i));
           break;
         case FLOAT:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getFloat());
+          seriesWriterImpl.write(time, tvPairs.getFloat(i));
           break;
         case DOUBLE:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(),
-                  timeValuePair.getValue().getDouble());
+          seriesWriterImpl.write(time, tvPairs.getDouble(i));
           break;
         case TEXT:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+          seriesWriterImpl.write(time, tvPairs.getBinary(i));
           break;
         default:
           LOGGER.error("Storage group {}, don't support data type: {}", storageGroup,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index c953eb6..2901e3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -35,7 +35,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
 
   @Override
   protected IWritableMemChunk genMemSeries(TSDataType dataType) {
-    return new WritableMemChunk(dataType);
+    return new WritableMemChunkV2(dataType);
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
index 5552f47..ffbb7ec 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.iotdb.db.utils.TimeValuePair;
 
-@FunctionalInterface
 public interface TimeValuePairSorter {
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
index 917515e..b303b6d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
@@ -19,49 +19,33 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import org.apache.iotdb.db.utils.PrimitiveArrayListV2;
-import org.apache.iotdb.db.utils.PrimitiveDataListPool;
 import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
-import org.apache.iotdb.db.utils.datastructure.LongTVList;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsBoolean;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsDouble;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsFloat;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsInt;
+import org.apache.iotdb.db.utils.TsPrimitiveType.TsLong;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class WritableMemChunkV2 {
+public class WritableMemChunkV2 implements IWritableMemChunk {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunkV2.class);
   private TSDataType dataType;
   private TVList list;
-  private long timeOffset = 0;
 
   public WritableMemChunkV2(TSDataType dataType) {
     this.dataType = dataType;
-    this.list = getTVList(dataType);
-  }
-
-  public TVList getTVList(TSDataType dataType) {
-    switch (dataType) {
-      case BOOLEAN:
-        return null;
-      case INT32:
-        return null;
-      case INT64:
-        return new LongTVList();
-      case FLOAT:
-        return null;
-      case DOUBLE:
-        return null;
-      case TEXT:
-        return null;
-      default:
-        throw new UnSupportedDataTypeException("DataType: " + dataType);
-    }
+    this.list = TVList.newList(dataType);
   }
 
+  @Override
   public void write(long insertTime, String insertValue) {
     switch (dataType) {
       case BOOLEAN:
@@ -87,6 +71,7 @@ public class WritableMemChunkV2 {
     }
   }
 
+  @Override
   public void write(long insertTime, Object value) {
     switch (dataType) {
       case BOOLEAN:
@@ -112,40 +97,98 @@ public class WritableMemChunkV2 {
     }
   }
 
+
+  @Override
   public void putLong(long t, long v) {
     list.putLong(t, v);
   }
 
+  @Override
   public void putInt(long t, int v) {
     list.putInt(t, v);
   }
 
+  @Override
   public void putFloat(long t, float v) {
     list.putFloat(t, v);
   }
 
+  @Override
   public void putDouble(long t, double v) {
     list.putDouble(t, v);
   }
 
+  @Override
   public void putBinary(long t, Binary v) {
     list.putBinary(t, v);
   }
 
+  @Override
   public void putBoolean(long t, boolean v) {
     list.putBoolean(t, v);
   }
 
+  @Override
+  public TVList getSortedTVList() {
+    list.sort();
+    return list;
+  }
+
+  @Override
+  public long count() {
+    return list.size();
+  }
+
+  @Override
   public TSDataType getType() {
     return dataType;
   }
 
+  @Override
   public void setTimeOffset(long offset) {
-    timeOffset = offset;
+    list.setTimeOffset(offset);
   }
 
-  public TVList getSortedList() {
-    list.sort();
-    return list;
+  @Override
+  public List<TimeValuePair> getSortedTimeValuePairList() {
+   List<TimeValuePair> result = new ArrayList<>();
+   TVList cloneList = list.clone();
+   cloneList.sort();
+   for (int i = 0; i < cloneList.size(); i++) {
+     long time = cloneList.getTime(i);
+     if (time < cloneList.getTimeOffset() ||
+         (i+1 < cloneList.size() && (time == cloneList.getTime(i+1)))) {
+       continue;
+     }
+
+     switch (dataType) {
+       case BOOLEAN:
+         result.add(new TimeValuePair(time, new TsBoolean(cloneList.getBoolean(i))));
+         break;
+       case INT32:
+         result.add(new TimeValuePair(time, new TsInt(cloneList.getInt(i))));
+         break;
+       case INT64:
+         result.add(new TimeValuePair(time, new TsLong(cloneList.getLong(i))));
+         break;
+       case FLOAT:
+         result.add(new TimeValuePair(time, new TsFloat(cloneList.getFloat(i))));
+         break;
+       case DOUBLE:
+         result.add(new TimeValuePair(time, new TsDouble(cloneList.getDouble(i))));
+         break;
+       case TEXT:
+         result.add(new TimeValuePair(time, new TsBinary(cloneList.getBinary(i))));
+         break;
+       default:
+         LOGGER.error("don't support data type: {}", dataType);
+         break;
+     }
+   }
+   return result;
   }
+
+  @Override
+  public void releasePrimitiveArrayList(){}
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
similarity index 80%
copy from iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
copy to iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 215adbe..9fa7fd8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -21,22 +21,21 @@ package org.apache.iotdb.db.utils.datastructure;
 import java.util.ArrayList;
 import java.util.List;
 
-public class LongTVList extends TVList {
+public class DoubleTVList extends TVList {
 
-  private List<long[]> values;
+  private List<double[]> values;
 
-  private long[] sortedValues;
+  private double[] sortedValues;
 
-  public LongTVList() {
+  public DoubleTVList() {
     super();
     values = new ArrayList<>();
-
   }
 
   @Override
-  public void putLong(long timestamp, long value) {
+  public void putDouble(long timestamp, double value) {
     if ((size % SINGLE_ARRAY_SIZE) == 0) {
-      values.add(new long[SINGLE_ARRAY_SIZE]);
+      values.add(new double[SINGLE_ARRAY_SIZE]);
       timestamps.add(new long[SINGLE_ARRAY_SIZE]);
     }
     int arrayIndex = size / SINGLE_ARRAY_SIZE;
@@ -47,7 +46,7 @@ public class LongTVList extends TVList {
   }
 
   @Override
-  public long getLong(int index) {
+  public double getDouble(int index) {
     if (index >= size) {
       throw new ArrayIndexOutOfBoundsException(index);
     }
@@ -60,7 +59,7 @@ public class LongTVList extends TVList {
     }
   }
 
-  public void set(int index, long timestamp, long value) {
+  public void set(int index, long timestamp, double value) {
     if (index >= size) {
       throw new ArrayIndexOutOfBoundsException(index);
     }
@@ -71,10 +70,10 @@ public class LongTVList extends TVList {
   }
 
   @Override
-  public LongTVList clone() {
-    LongTVList cloneList = new LongTVList();
+  public DoubleTVList clone() {
+    DoubleTVList cloneList = new DoubleTVList();
     if (!sorted) {
-      for (long[] valueArray : values) {
+      for (double[] valueArray : values) {
         cloneList.values.add(cloneValue(valueArray));
       }
       for (long[] timestampArray : timestamps) {
@@ -82,7 +81,7 @@ public class LongTVList extends TVList {
       }
     } else {
       cloneList.sortedTimestamps = new long[size];
-      cloneList.sortedValues = new long[size];
+      cloneList.sortedValues = new double[size];
       System.arraycopy(sortedTimestamps, 0, cloneList.sortedTimestamps, 0, size);
       System.arraycopy(sortedValues, 0, cloneList.sortedValues, 0, size);
     }
@@ -92,8 +91,8 @@ public class LongTVList extends TVList {
     return cloneList;
   }
 
-  private long[] cloneValue(long[] array) {
-    long[] cloneArray = new long[array.length];
+  private double[] cloneValue(double[] array) {
+    double[] cloneArray = new double[array.length];
     System.arraycopy(array, 0, cloneArray, 0, array.length);
     return cloneArray;
   }
@@ -104,7 +103,7 @@ public class LongTVList extends TVList {
 
   public void sort() {
     sortedTimestamps = new long[size];
-    sortedValues = new long[size];
+    sortedValues = new double[size];
     sort(0, size);
     sorted = true;
     values = null;
@@ -118,22 +117,22 @@ public class LongTVList extends TVList {
 
   protected void set(int src, int dest) {
     long srcT = getTime(src);
-    long srcV = getLong(src);
+    double srcV = getDouble(src);
     set(dest, srcT, srcV);
   }
 
   protected void setSorted(int src, int dest) {
     sortedTimestamps[dest] = getTime(src);
-    sortedValues[dest] = getLong(src);
+    sortedValues[dest] = getDouble(src);
   }
 
   protected void reverseRange(int lo, int hi) {
     hi--;
     while (lo < hi) {
       long loT = getTime(lo);
-      long loV = getLong(lo);
+      double loV = getDouble(lo);
       long hiT = getTime(hi);
-      long hiV = getLong(hi);
+      double hiV = getDouble(hi);
       set(lo++, hiT, hiV);
       set(hi--, loT, loV);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 215adbe..6cec61f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -30,7 +30,6 @@ public class LongTVList extends TVList {
   public LongTVList() {
     super();
     values = new ArrayList<>();
-
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 6e6b7c4..b0812b3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -117,7 +117,7 @@ public abstract class TVList {
 
   protected abstract void reverseRange(int lo, int hi);
 
-  protected abstract TVList clone();
+  public abstract TVList clone();
 
   protected long[] cloneTime(long[] array) {
     long[] cloneArray = new long[array.length];
@@ -166,6 +166,7 @@ public abstract class TVList {
       case INT64:
         return new LongTVList();
       case DOUBLE:
+        return new DoubleTVList();
       case BOOLEAN:
     }
     return null;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index 313c6fb..76f8d81 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -51,7 +51,6 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
           return;
         }
         if (logger.isDebugEnabled()) {
-          logger.debug("Timed force starts, {} nodes to be flushed", nodeMap.size());
         }
 
         for (WriteLogNode node : nodeMap.values()) {
@@ -61,7 +60,6 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
             logger.error("Cannot force {}, because ", node, e);
           }
         }
-        logger.debug("Timed force finished");
         try {
           Thread.sleep(config.getForceWalPeriodInMs());
         } catch (InterruptedException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index b9b5a82..054e7dd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -251,7 +251,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     }
     try {
       long start = System.currentTimeMillis();
-      LOGGER.debug("Log node {} starts force, {} logs to be forced", identifier, bufferedLogNum);
       try {
         if (currentFileWriter != null) {
           currentFileWriter.force();
@@ -259,7 +258,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
       } catch (IOException e) {
         LOGGER.error("Log node {} force failed.", identifier, e);
       }
-      LOGGER.debug("Log node {} ends force.", identifier);
       long elapse = System.currentTimeMillis() - start;
       if (elapse > 1000) {
         LOGGER.info("WAL forceWal cost {} ms", elapse);
@@ -278,7 +276,6 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     }
     try {
       long start = System.currentTimeMillis();
-      LOGGER.debug("Log node {} starts sync, {} logs to be synced", identifier, bufferedLogNum);
       if (bufferedLogNum == 0) {
         return;
       }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 4c788a0..11f4a1d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -110,9 +111,9 @@ public class TsFileRecoverPerformer {
     }
 
     // flush logs
-    MemTableFlushTask tableFlushTask = new MemTableFlushTask(restorableTsFileIOWriter,
-        logNodePrefix, 0, (a,b) -> {});
-    tableFlushTask.flushMemTable(fileSchema, recoverMemTable, versionController.nextVersion());
+    MemTableFlushTaskV2 tableFlushTask = new MemTableFlushTaskV2(recoverMemTable, fileSchema, restorableTsFileIOWriter,
+        logNodePrefix, (a) -> {});
+    tableFlushTask.flushMemTable();
 
     // close file
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index aeb789e..962192f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.tsfile.utils.Binary;
 public class BatchData implements Serializable {
 
   private static final long serialVersionUID = -4620310601188394839L;
-  private int timeCapacity = 1;
-  private int valueCapacity = 1;
+  private int timeCapacity = 16;
+  private int valueCapacity = 16;
   private int emptyTimeCapacity = 1;
   private int capacityThreshold = 1024;