You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/17 03:22:29 UTC

[iotdb] 01/02: Introduce IWritableMemChunkGroup

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch IWritableMemChunkGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0cd21e12749228eea857e70829ef305cac412ee2
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 17 11:20:01 2021 +0800

    Introduce IWritableMemChunkGroup
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   5 +-
 .../iotdb/db/engine/flush/NotifyFlushMemTable.java |  12 --
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 237 +++++++++------------
 .../engine/memtable/AlignedWritableMemChunk.java   | 104 +++++----
 .../memtable/AlignedWritableMemChunkGroup.java     |  62 ++++++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   9 +-
 .../db/engine/memtable/IWritableMemChunk.java      |   7 +-
 .../db/engine/memtable/IWritableMemChunkGroup.java |  30 +++
 .../db/engine/memtable/PrimitiveMemTable.java      |  17 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  12 +-
 .../db/engine/memtable/WritableMemChunkGroup.java  |  99 +++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    |  29 +--
 .../apache/iotdb/db/metadata/path/AlignedPath.java |   7 +-
 .../iotdb/db/metadata/path/MeasurementPath.java    |   8 +-
 .../apache/iotdb/db/metadata/path/PartialPath.java |   4 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |   6 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |  15 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  22 +-
 .../db/engine/memtable/MemTableTestUtils.java      |   6 +-
 .../db/engine/memtable/MemtableBenchmark.java      |   7 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |  14 +-
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  28 +++
 23 files changed, 481 insertions(+), 268 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 17deb09..1d0ae09 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1281,13 +1281,10 @@ public class IoTDBDescriptor {
                   + queryMemoryAllocateProportion);
         }
       }
-    }
 
-    conf.setMaxQueryDeduplicatedPathNum(
-        Integer.parseInt(
-            properties.getProperty(
-                "max_deduplicated_path_num",
-                Integer.toString(conf.getMaxQueryDeduplicatedPathNum()))));
+      conf.setMaxQueryDeduplicatedPathNum(
+          Integer.parseInt(properties.getProperty("max_deduplicated_path_num")));
+    }
   }
 
   @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 957d03e..ecbdac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -101,11 +102,11 @@ public class MemTableFlushTask {
     long sortTime = 0;
 
     // for map do not use get(key) to iterate
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
+    for (Map.Entry<String, IWritableMemChunkGroup> memTableEntry :
         memTable.getMemTableMap().entrySet()) {
       encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
-      final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+      final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 169c22f..8d80b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.flush;
 
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 /**
  * Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,16 +28,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 public class NotifyFlushMemTable extends AbstractMemTable {
 
   @Override
-  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
-    return null;
-  }
-
-  @Override
-  protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
-    return null;
-  }
-
-  @Override
   public IMemTable copy() {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 72c9b72..c5e8ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -22,20 +22,13 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,7 +40,7 @@ import java.util.Map.Entry;
 
 public abstract class AbstractMemTable implements IMemTable {
 
-  private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
+  private final Map<String, IWritableMemChunkGroup> memTableMap;
   /**
    * The initial value is true because we want calculate the text data size when recover memTable!!
    */
@@ -80,12 +73,12 @@ public abstract class AbstractMemTable implements IMemTable {
     this.memTableMap = new HashMap<>();
   }
 
-  public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+  public AbstractMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
     this.memTableMap = memTableMap;
   }
 
   @Override
-  public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
+  public Map<String, IWritableMemChunkGroup> getMemTableMap() {
     return memTableMap;
   }
 
@@ -93,63 +86,64 @@ public abstract class AbstractMemTable implements IMemTable {
    * create this MemChunk if it's not exist
    *
    * @param deviceId device id
-   * @param schema measurement schema
-   * @return this MemChunk
+   * @param schemaList measurement schemaList
+   * @return this MemChunkGroup
    */
-  private IWritableMemChunk createMemChunkIfNotExistAndGet(
-      String deviceId, IMeasurementSchema schema) {
-    Map<String, IWritableMemChunk> memSeries =
-        memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
-    return memSeries.computeIfAbsent(
-        schema.getMeasurementId(),
-        k -> {
-          seriesNumber++;
-          totalPointsNumThreshold += avgSeriesPointNumThreshold;
-          return genMemSeries(schema);
-        });
-  }
-
-  private IWritableMemChunk createAlignedMemChunkIfNotExistAndGet(
-      String deviceId, IMeasurementSchema schema) {
-    Map<String, IWritableMemChunk> memSeries =
-        memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
-    VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
-    return memSeries.computeIfAbsent(
-        vectorSchema.getMeasurementId(),
-        k -> {
-          seriesNumber++;
-          totalPointsNumThreshold +=
-              avgSeriesPointNumThreshold * vectorSchema.getSubMeasurementsCount();
-          return genAlignedMemSeries(vectorSchema);
-        });
+  private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
+      String deviceId, List<IMeasurementSchema> schemaList) {
+    IWritableMemChunkGroup memChunkGroup =
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new WritableMemChunkGroup(schemaList);
+            });
+    for (IMeasurementSchema schema : schemaList) {
+      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+        seriesNumber++;
+        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+      }
+    }
+    return memChunkGroup;
+  }
+
+  private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
+      String deviceId, List<IMeasurementSchema> schemaList) {
+    IWritableMemChunkGroup memChunkGroup =
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new AlignedWritableMemChunkGroup(schemaList);
+            });
+    for (IMeasurementSchema schema : schemaList) {
+      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+        seriesNumber++;
+        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+      }
+    }
+    return memChunkGroup;
   }
 
-  protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
-
-  protected abstract IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema);
-
   @Override
   public void insert(InsertRowPlan insertRowPlan) {
     updatePlanIndexes(insertRowPlan.getIndex());
     Object[] values = insertRowPlan.getValues();
 
-    IMeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes();
-    for (int i = 0; i < measurementMNodes.length; i++) {
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
       if (values[i] == null) {
         continue;
       }
-      memSize +=
-          MemUtils.getRecordSize(
-              measurementMNodes[i].getSchema().getType(), values[i], disableMemControl);
-
-      write(
-          insertRowPlan.getDeviceId().getFullPath(),
-          measurementMNodes[i].getSchema(),
-          insertRowPlan.getTime(),
-          values[i]);
+      IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
+      schemaList.add(schema);
+      dataTypes.add(schema.getType());
     }
+    memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
+    write(insertRowPlan.getDeviceId().getFullPath(), schemaList, insertRowPlan.getTime(), values);
     totalPointsNum +=
         insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
   }
@@ -158,34 +152,24 @@ public abstract class AbstractMemTable implements IMemTable {
   public void insertAlignedRow(InsertRowPlan insertRowPlan) {
     updatePlanIndexes(insertRowPlan.getIndex());
     // write vector
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    List<TSEncoding> encodings = new ArrayList<>();
-    CompressionType compressionType = null;
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
-      if (insertRowPlan.getMeasurements()[i] == null) {
+      if (insertRowPlan.getValues()[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
-      measurements.add(schema.getMeasurementId());
-      types.add(schema.getType());
-      encodings.add(schema.getEncodingType());
-      compressionType = schema.getCompressor();
+      schemaList.add(schema);
+      dataTypes.add(schema.getType());
     }
-    if (measurements.isEmpty()) {
+    if (schemaList.isEmpty()) {
       return;
     }
-    VectorMeasurementSchema vectorSchema =
-        new VectorMeasurementSchema(
-            AlignedPath.VECTOR_PLACEHOLDER,
-            measurements.toArray(new String[measurements.size()]),
-            types.toArray(new TSDataType[measurements.size()]),
-            encodings.toArray(new TSEncoding[measurements.size()]),
-            compressionType);
-    memSize += MemUtils.getAlignedRecordSize(types, insertRowPlan.getValues(), disableMemControl);
+    memSize +=
+        MemUtils.getAlignedRecordsSize(dataTypes, insertRowPlan.getValues(), disableMemControl);
     writeAlignedRow(
         insertRowPlan.getDeviceId().getFullPath(),
-        vectorSchema,
+        schemaList,
         insertRowPlan.getTime(),
         insertRowPlan.getValues());
     totalPointsNum +=
@@ -224,92 +208,81 @@ public abstract class AbstractMemTable implements IMemTable {
 
   @Override
   public void write(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue) {
-    IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, schema);
-    memSeries.write(insertTime, objectValue);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+    IWritableMemChunkGroup memChunkGroup =
+        createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+    memChunkGroup.write(insertTime, objectValue, schemaList);
   }
 
   @Override
   public void writeAlignedRow(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue) {
-    IWritableMemChunk memSeries = createAlignedMemChunkIfNotExistAndGet(deviceId, schema);
-    memSeries.writeAlignedValue(insertTime, objectValue, schema);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+    IWritableMemChunkGroup memChunkGroup =
+        createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+    memChunkGroup.write(insertTime, objectValue, schemaList);
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
   public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
-      IWritableMemChunk memSeries =
-          createMemChunkIfNotExistAndGet(
-              insertTabletPlan.getDeviceId().getFullPath(),
-              insertTabletPlan.getMeasurementMNodes()[i].getSchema());
-      memSeries.write(
-          insertTabletPlan.getTimes(),
-          insertTabletPlan.getColumns()[i],
-          insertTabletPlan.getBitMaps() != null ? insertTabletPlan.getBitMaps()[i] : null,
-          insertTabletPlan.getDataTypes()[i],
-          start,
-          end);
+      IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
+      schemaList.add(schema);
     }
+    IWritableMemChunkGroup memChunkGroup =
+        createMemChunkGroupIfNotExistAndGet(
+            insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+    memChunkGroup.writeValues(
+        insertTabletPlan.getTimes(),
+        insertTabletPlan.getColumns(),
+        insertTabletPlan.getBitMaps(),
+        schemaList,
+        start,
+        end);
   }
 
+  @Override
   public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    List<TSEncoding> encodings = new ArrayList<>();
-    CompressionType compressionType = null;
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
-      measurements.add(schema.getMeasurementId());
-      types.add(schema.getType());
-      encodings.add(schema.getEncodingType());
-      compressionType = schema.getCompressor();
+      schemaList.add(schema);
     }
-    if (measurements.isEmpty()) {
+    if (schemaList.isEmpty()) {
       return;
     }
-    VectorMeasurementSchema vectorSchema =
-        new VectorMeasurementSchema(
-            AlignedPath.VECTOR_PLACEHOLDER,
-            measurements.toArray(new String[measurements.size()]),
-            types.toArray(new TSDataType[measurements.size()]),
-            encodings.toArray(new TSEncoding[measurements.size()]),
-            compressionType);
-    IWritableMemChunk memSeries =
-        createAlignedMemChunkIfNotExistAndGet(
-            insertTabletPlan.getDeviceId().getFullPath(), vectorSchema);
-    memSeries.writeAlignedValues(
+    IWritableMemChunkGroup memChunkGroup =
+        createAlignedMemChunkGroupIfNotExistAndGet(
+            insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+    memChunkGroup.writeValues(
         insertTabletPlan.getTimes(),
         insertTabletPlan.getColumns(),
         insertTabletPlan.getBitMaps(),
-        vectorSchema,
+        schemaList,
         start,
         end);
   }
 
   @Override
   public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) {
-    Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
-    if (null == memSeries) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    if (null == memChunkGroup) {
       return true;
     }
-    return !memSeries.containsKey(measurement);
+    return !memChunkGroup.contains(measurement);
   }
 
   @Override
-  public int getCurrentChunkPointNum(String deviceId, String measurement) {
-    Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
-    IWritableMemChunk memChunk = memSeries.get(measurement);
-    return (int) memChunk.count();
+  public long getCurrentChunkPointNum(String deviceId, String measurement) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    return memChunkGroup.getCurrentChunkPointNum(measurement);
   }
 
   @Override
@@ -325,10 +298,8 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public long size() {
     long sum = 0;
-    for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) {
-      for (IWritableMemChunk writableMemChunk : seriesMap.values()) {
-        sum += writableMemChunk.count();
-      }
+    for (IWritableMemChunkGroup writableMemChunkGroup : memTableMap.values()) {
+      sum += writableMemChunkGroup.count();
     }
     return sum;
   }
@@ -373,12 +344,13 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public void delete(
       PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
-    Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
-    if (deviceMap == null) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(devicePath.getFullPath());
+    if (memChunkGroup == null) {
       return;
     }
 
-    Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator();
+    Iterator<Entry<String, IWritableMemChunk>> iter =
+        memChunkGroup.getMemChunkMap().entrySet().iterator();
     while (iter.hasNext()) {
       Entry<String, IWritableMemChunk> entry = iter.next();
       IWritableMemChunk chunk = entry.getValue();
@@ -433,13 +405,8 @@ public abstract class AbstractMemTable implements IMemTable {
 
   @Override
   public void release() {
-    for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) {
-      for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) {
-        TVList list = subEntry.getValue().getTVList();
-        if (list.getReferenceCount() == 0) {
-          TVListAllocator.getInstance().release(list);
-        }
-      }
+    for (Entry<String, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+      entry.getValue().release();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index cd7184b..69ebabb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -23,64 +23,71 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class AlignedWritableMemChunk implements IWritableMemChunk {
 
-  private final VectorMeasurementSchema schema;
+  private final Map<String, Integer> measurementIndexMap;
+  private final List<IMeasurementSchema> schemaList;
   private AlignedTVList list;
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
   private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class);
 
-  public AlignedWritableMemChunk(VectorMeasurementSchema schema) {
-    this.schema = schema;
-    this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
+  public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
+    this.measurementIndexMap = new LinkedHashMap<>();
+    List<TSDataType> dataTypeList = new ArrayList<>();
+    this.schemaList = schemaList;
+    for (int i = 0; i < schemaList.size(); i++) {
+      measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
+      dataTypeList.add(schemaList.get(i).getType());
+    }
+    this.list = TVListAllocator.getInstance().allocate(dataTypeList);
   }
 
   public boolean containsMeasurement(String measurementId) {
-    return schema.containsSubMeasurement(measurementId);
+    return measurementIndexMap.containsKey(measurementId);
   }
 
   @Override
   public void putLong(long t, long v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putInt(long t, int v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putFloat(long t, float v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putDouble(long t, double v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBinary(long t, Binary v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBoolean(long t, boolean v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -90,32 +97,32 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -126,19 +133,20 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void write(long insertTime, Object objectValue) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
-  public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
-    int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+  public void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
     putAlignedValue(insertTime, objectValue, columnIndexArray);
   }
 
   @Override
   public void write(
       long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -146,32 +154,29 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end) {
-    int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+    int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
     putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
   }
 
-  private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) {
-    VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
-    List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList();
-    List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList();
-    List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList();
-    for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) {
-      if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) {
-        this.schema.addMeasurement(
-            measurementIdsInInsertPlan.get(i),
-            dataTypesInInsertPlan.get(i),
-            encodingsInInsertPlan.get(i));
-        this.list.extendColumn(dataTypesInInsertPlan.get(i));
+  private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> schemaListInInsertPlan) {
+    List<String> measurementIdsInInsertPlan = new ArrayList<>();
+    for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
+      measurementIdsInInsertPlan.add(schemaListInInsertPlan.get(i).getMeasurementId());
+      if (!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
+        this.measurementIndexMap.put(
+            schemaListInInsertPlan.get(i).getMeasurementId(), measurementIndexMap.size());
+        this.schemaList.add(schemaListInInsertPlan.get(i));
+        this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
       }
     }
-    List<String> measurementIdsInTVList = this.schema.getSubMeasurementsList();
-    int[] columnIndexArray = new int[measurementIdsInTVList.size()];
-    for (int i = 0; i < columnIndexArray.length; i++) {
-      columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i));
-    }
+    int[] columnIndexArray = new int[measurementIndexMap.size()];
+    measurementIndexMap.forEach(
+        (measurementId, i) -> {
+          columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementId);
+        });
     return columnIndexArray;
   }
 
@@ -182,7 +187,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public long count() {
-    return (long) list.size() * schema.getSubMeasurementsCount();
+    return (long) list.size() * measurementIndexMap.size();
   }
 
   public long alignedListSize() {
@@ -191,7 +196,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public IMeasurementSchema getSchema() {
-    return schema;
+    return null;
   }
 
   @Override
@@ -209,7 +214,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
     list.increaseReferenceCount();
     List<Integer> columnIndexList = new ArrayList<>();
     for (IMeasurementSchema measurementSchema : schemaList) {
-      columnIndexList.add(schema.getSubMeasurementIndex(measurementSchema.getMeasurementId()));
+      columnIndexList.add(measurementIndexMap.get(measurementSchema.getMeasurementId()));
     }
     return list.getTvListByColumnIndex(columnIndexList);
   }
@@ -243,7 +248,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public IChunkWriter createIChunkWriter() {
-    return new AlignedChunkWriterImpl(schema);
+    return new AlignedChunkWriterImpl(schemaList);
   }
 
   @Override
@@ -310,4 +315,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
       timeDuplicateAlignedRowIndexList = null;
     }
   }
+
+  @Override
+  public void release() {
+    if (list.getReferenceCount() == 0) {
+      TVListAllocator.getInstance().release(list);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
new file mode 100644
index 0000000..a15fb6f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
+
+  private AlignedWritableMemChunk memChunk;
+
+  public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+    memChunk = new AlignedWritableMemChunk(schemaList);
+  }
+
+  @Override
+  public void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end) {
+    memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end);
+  }
+
+  @Override
+  public void release() {
+    memChunk.release();
+  }
+
+  @Override
+  public long count() {
+    return memChunk.count();
+  }
+
+  @Override
+  public boolean contains(String measurement) {
+    return memChunk.containsMeasurement(measurement);
+  }
+
+  @Override
+  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    memChunk.writeAlignedValue(insertTime, objectValue, schemaList);
+  }
+
+  @Override
+  public Map<String, IWritableMemChunk> getMemChunkMap() {
+    return Collections.singletonMap("", memChunk);
+  }
+
+  @Override
+  public long getCurrentChunkPointNum(String measurement) {
+    return memChunk.count();
+  }
+
+  public AlignedWritableMemChunk getAlignedMemChunk() {
+    return memChunk;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1dc2446..e05b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -42,12 +42,13 @@ import java.util.Map;
  */
 public interface IMemTable {
 
-  Map<String, Map<String, IWritableMemChunk>> getMemTableMap();
+  Map<String, IWritableMemChunkGroup> getMemTableMap();
 
-  void write(String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue);
+  void write(
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
 
   void writeAlignedRow(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
   /**
    * write data in the range [start, end). Null value in each column values will be replaced by the
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
@@ -145,7 +146,7 @@ public interface IMemTable {
   boolean checkIfChunkDoesNotExist(String deviceId, String measurement);
 
   /** only used when mem control enabled */
-  int getCurrentChunkPointNum(String deviceId, String measurement);
+  long getCurrentChunkPointNum(String deviceId, String measurement);
 
   /** only used when mem control enabled */
   void addTextDataSize(long textDataIncrement);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index ee10ea1..ce8b6f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -60,7 +60,8 @@ public interface IWritableMemChunk {
 
   void write(long insertTime, Object objectValue);
 
-  void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema);
+  void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
 
   /**
    * write data in the range [start, end). Null value in the valueList will be replaced by the
@@ -73,7 +74,7 @@ public interface IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end);
 
@@ -129,4 +130,6 @@ public interface IWritableMemChunk {
   IChunkWriter createIChunkWriter();
 
   void encode(IChunkWriter chunkWriter);
+
+  void release();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
new file mode 100644
index 0000000..ffbd65c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWritableMemChunkGroup {
+
+  void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end);
+
+  void release();
+
+  long count();
+
+  boolean contains(String measurement);
+
+  void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
+
+  Map<String, IWritableMemChunk> getMemChunkMap();
+
+  long getCurrentChunkPointNum(String measurement);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 6915967..425fc21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.engine.memtable;
 
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,23 +30,13 @@ public class PrimitiveMemTable extends AbstractMemTable {
     this.disableMemControl = !enableMemControl;
   }
 
-  public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+  public PrimitiveMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
     super(memTableMap);
   }
 
   @Override
-  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
-    return new WritableMemChunk(schema);
-  }
-
-  @Override
-  protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
-    return new AlignedWritableMemChunk((VectorMeasurementSchema) schema);
-  }
-
-  @Override
   public IMemTable copy() {
-    Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
+    Map<String, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap());
 
     return new PrimitiveMemTable(newMap);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3b096ad..51c7749 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -72,7 +72,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
+  public void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
   }
 
@@ -114,7 +115,7 @@ public class WritableMemChunk implements IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
@@ -346,4 +347,11 @@ public class WritableMemChunk implements IWritableMemChunk {
       }
     }
   }
+
+  @Override
+  public void release() {
+    //        if (list.getReferenceCount() == 0) {
+    //          TVListAllocator.getInstance().release(list);
+    //        }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
new file mode 100644
index 0000000..5845da3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -0,0 +1,99 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WritableMemChunkGroup implements IWritableMemChunkGroup {
+
+  private Map<String, IWritableMemChunk> memChunkMap;
+
+  public WritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+    memChunkMap = new HashMap<>();
+    for (IMeasurementSchema schema : schemaList) {
+      createMemChunkIfNotExistAndGet(schema);
+    }
+  }
+
+  @Override
+  public void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end) {
+    int emptyColumnCount = 0;
+    for (int i = 0; i < columns.length; i++) {
+      if (columns[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+      memChunk.write(
+          times,
+          columns[i],
+          bitMaps == null ? null : bitMaps[i],
+          schemaList.get(i - emptyColumnCount).getType(),
+          start,
+          end);
+    }
+  }
+
+  private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) {
+    return memChunkMap.computeIfAbsent(
+        schema.getMeasurementId(),
+        k -> {
+          return new WritableMemChunk(schema);
+        });
+  }
+
+  @Override
+  public void release() {
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      memChunk.release();
+    }
+  }
+
+  @Override
+  public long count() {
+    long count = 0;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      count += memChunk.count();
+    }
+    return count;
+  }
+
+  @Override
+  public boolean contains(String measurement) {
+    return memChunkMap.containsKey(measurement);
+  }
+
+  @Override
+  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    int emptyColumnCount = 0;
+    for (int i = 0; i < objectValue.length; i++) {
+      if (objectValue[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+      memChunk.write(insertTime, objectValue[i]);
+    }
+  }
+
+  @Override
+  public Map<String, IWritableMemChunk> getMemChunkMap() {
+    return memChunkMap;
+  }
+
+  @Override
+  public long getCurrentChunkPointNum(String measurement) {
+    return memChunkMap.get(measurement).count();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 93d020f..5908597 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
@@ -355,7 +356,7 @@ public class TsFileProcessor {
         memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
       } else {
         // here currentChunkPointNum >= 1
-        int currentChunkPointNum =
+        long currentChunkPointNum =
             workMemTable.getCurrentChunkPointNum(deviceId, insertRowPlan.getMeasurements()[i]);
         memTableIncrement +=
             (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
@@ -378,7 +379,7 @@ public class TsFileProcessor {
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
-    AlignedWritableMemChunk vectorMemChunk = null;
+    AlignedWritableMemChunk alignedMemChunk = null;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
     if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
       // ChunkMetadataIncrement
@@ -388,15 +389,15 @@ public class TsFileProcessor {
       memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes());
     } else {
       // here currentChunkPointNum >= 1
-      int currentChunkPointNum =
+      long currentChunkPointNum =
           workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
       memTableIncrement +=
           (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
               ? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes())
               : 0;
-      vectorMemChunk =
-          ((AlignedWritableMemChunk)
-              workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+      alignedMemChunk =
+          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+              .getAlignedMemChunk();
     }
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
       // skip failed Measurements
@@ -404,10 +405,10 @@ public class TsFileProcessor {
         continue;
       }
       // extending the column of aligned mem chunk
-      if (vectorMemChunk != null
-          && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+      if (alignedMemChunk != null
+          && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
         memTableIncrement +=
-            (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+            (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
                 * insertRowPlan.getDataTypes()[i].getDataTypeSize();
       }
       // TEXT data mem size
@@ -486,13 +487,13 @@ public class TsFileProcessor {
           ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
               * TVList.tvListArrayMemSize(dataType);
     } else {
-      int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
+      long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
                 * TVList.tvListArrayMemSize(dataType);
       } else {
-        int acquireArray =
+        long acquireArray =
             (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
                 / PrimitiveArrayManager.ARRAY_SIZE;
         memIncrements[0] +=
@@ -526,7 +527,7 @@ public class TsFileProcessor {
               * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
     } else {
       int currentChunkPointNum =
-          workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
+          (int) workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -541,8 +542,8 @@ public class TsFileProcessor {
                 : acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
       }
       vectorMemChunk =
-          ((AlignedWritableMemChunk)
-              workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+              .getAlignedMemChunk();
     }
     for (int i = 0; i < dataTypes.length; i++) {
       TSDataType dataType = dataTypes[i];
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index ff51b18..f85338d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.metadata.path;
 
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
@@ -334,14 +335,14 @@ public class AlignedPath extends PartialPath {
 
   @Override
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     // check If memtable contains this path
     if (!memTableMap.containsKey(getDevice())) {
       return null;
     }
     AlignedWritableMemChunk alignedMemChunk =
-        ((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER));
+        ((AlignedWritableMemChunkGroup) memTableMap.get(getDevice())).getAlignedMemChunk();
     boolean containsMeasurement = false;
     for (String measurement : measurementList) {
       if (alignedMemChunk.containsMeasurement(measurement)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index d8aef64..d6900de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.path;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -246,14 +247,15 @@ public class MeasurementPath extends PartialPath {
 
   @Override
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     // check If Memtable Contains this path
     if (!memTableMap.containsKey(getDevice())
-        || !memTableMap.get(getDevice()).containsKey(getMeasurement())) {
+        || !memTableMap.get(getDevice()).contains(getMeasurement())) {
       return null;
     }
-    IWritableMemChunk memChunk = memTableMap.get(getDevice()).get(getMeasurement());
+    IWritableMemChunk memChunk =
+        memTableMap.get(getDevice()).getMemChunkMap().get(getMeasurement());
     // get sorted tv list is synchronized so different query can get right sorted list reference
     TVList chunkCopy = memChunk.getSortedTvListForQuery();
     int curSize = chunkCopy.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 90581d6..ae42486 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.path;
 
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -423,7 +423,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
    * @return ReadOnlyMemChunk
    */
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     throw new UnsupportedOperationException("Should call exact sub class!");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 5fb024b..c29ebc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -241,11 +241,7 @@ public class InsertRowPlan extends InsertPlan {
           }
           continue;
         }
-        if (isAligned) {
-          dataTypes[i] = measurementMNodes[i].getSchema().getSubMeasurementsTSDataTypeList().get(i);
-        } else {
-          dataTypes[i] = measurementMNodes[i].getSchema().getType();
-        }
+        dataTypes[i] = measurementMNodes[i].getSchema().getType();
         try {
           values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
         } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index e81a2ab..10617cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -56,10 +56,23 @@ public class MemUtils {
   }
 
   /**
+   * function for getting the value size. If mem control enabled, do not add text data size here,
+   * the size will be added to memtable before inserting.
+   */
+  public static long getRecordsSize(
+      List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+    long memSize = 0L;
+    for (int i = 0; i < dataTypes.size(); i++) {
+      memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize);
+    }
+    return memSize;
+  }
+
+  /**
    * function for getting the vector value size. If mem control enabled, do not add text data size
    * here, the size will be added to memtable before inserting.
    */
-  public static long getAlignedRecordSize(
+  public static long getAlignedRecordsSize(
       List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
     // time and index size
     long memSize = 8L + 4L;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 7ded716..836fdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.writelog.recover;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -126,11 +127,11 @@ public class LogReplayer {
       }
     }
 
-    Map<String, Map<String, IWritableMemChunk>> memTableMap = recoverMemTable.getMemTableMap();
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> deviceEntry : memTableMap.entrySet()) {
+    Map<String, IWritableMemChunkGroup> memTableMap = recoverMemTable.getMemTableMap();
+    for (Map.Entry<String, IWritableMemChunkGroup> deviceEntry : memTableMap.entrySet()) {
       String deviceId = deviceEntry.getKey();
       for (Map.Entry<String, IWritableMemChunk> measurementEntry :
-          deviceEntry.getValue().entrySet()) {
+          deviceEntry.getValue().getMemChunkMap().entrySet()) {
         WritableMemChunk memChunk = (WritableMemChunk) measurementEntry.getValue();
         currentTsFileResource.updateStartTime(deviceId, memChunk.getFirstPoint());
         currentTsFileResource.updateEndTime(deviceId, memChunk.getLastPoint());
@@ -194,10 +195,19 @@ public class LogReplayer {
     // mark failed plan manually
     checkDataTypeAndMarkFailed(mNodes, plan);
     if (plan instanceof InsertRowPlan) {
-      recoverMemTable.insert((InsertRowPlan) plan);
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedRow((InsertRowPlan) plan);
+      } else {
+        recoverMemTable.insert((InsertRowPlan) plan);
+      }
     } else {
-      recoverMemTable.insertTablet(
-          (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      } else {
+        recoverMemTable.insertTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index 09b21c6..00a6fd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class MemTableTestUtils {
@@ -62,9 +63,10 @@ public class MemTableTestUtils {
     for (long l = startTime; l <= endTime; l++) {
       iMemTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN)),
           l,
-          (int) l);
+          new Object[] {(int) l});
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
index c11cd90..086a647 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
+import java.util.Collections;
+
 /** Memtable insert benchmark. Bench the Memtable and get its performance. */
 public class MemtableBenchmark {
 
@@ -46,9 +48,10 @@ public class MemtableBenchmark {
       for (int j = 0; j < numOfMeasurement; j++) {
         memTable.write(
             deviceId,
-            new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN),
+            Collections.singletonList(
+                new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN)),
             System.nanoTime(),
-            String.valueOf(System.currentTimeMillis()));
+            new Object[] {String.valueOf(System.currentTimeMillis())});
       }
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index a1c925f..6be17a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -119,16 +119,18 @@ public class PrimitiveMemTableTest {
     for (int i = 0; i < dataSize; i++) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
           dataSize - i - 1,
-          i + 10);
+          new Object[] {i + 10});
     }
     for (int i = 0; i < dataSize; i++) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
           i,
-          i);
+          new Object[] {i});
     }
     MeasurementPath fullPath =
         new MeasurementPath(
@@ -163,9 +165,9 @@ public class PrimitiveMemTableTest {
     for (TimeValuePair aRet : ret) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(sensorId, dataType, encoding),
+          Collections.singletonList(new UnaryMeasurementSchema(sensorId, dataType, encoding)),
           aRet.getTimestamp(),
-          aRet.getValue().getValue());
+          new Object[] {aRet.getValue().getValue()});
     }
     MeasurementPath fullPath =
         new MeasurementPath(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 4c977ca..81999ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.tsfile.write.chunk;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -63,6 +66,31 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
     this.valueIndex = 0;
   }
 
+  public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
+    TSEncoding timeEncoding =
+        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+    TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+    timeChunkWriter =
+        new TimeChunkWriter(
+            "",
+            schemaList.get(0).getCompressor(),
+            timeEncoding,
+            TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType));
+
+    valueChunkWriterList = new ArrayList<>(schemaList.size());
+    for (int i = 0; i < schemaList.size(); i++) {
+      valueChunkWriterList.add(
+          new ValueChunkWriter(
+              schemaList.get(i).getMeasurementId(),
+              schemaList.get(i).getCompressor(),
+              schemaList.get(i).getType(),
+              schemaList.get(i).getEncodingType(),
+              schemaList.get(i).getValueEncoder()));
+    }
+
+    this.valueIndex = 0;
+  }
+
   public void write(long time, int value, boolean isNull) {
     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
   }