You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/17 03:22:28 UTC

[iotdb] branch IWritableMemChunkGroup created (now 0acc292)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch IWritableMemChunkGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 0acc292  Merge branch 'master' of https://github.com/apache/iotdb into IWritableMemChunkGroup

This branch includes the following new commits:

     new 0cd21e1  Introduce IWritableMemChunkGroup
     new 0acc292  Merge branch 'master' of https://github.com/apache/iotdb into IWritableMemChunkGroup

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/02: Introduce IWritableMemChunkGroup

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch IWritableMemChunkGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0cd21e12749228eea857e70829ef305cac412ee2
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 17 11:20:01 2021 +0800

    Introduce IWritableMemChunkGroup
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   5 +-
 .../iotdb/db/engine/flush/NotifyFlushMemTable.java |  12 --
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 237 +++++++++------------
 .../engine/memtable/AlignedWritableMemChunk.java   | 104 +++++----
 .../memtable/AlignedWritableMemChunkGroup.java     |  62 ++++++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   9 +-
 .../db/engine/memtable/IWritableMemChunk.java      |   7 +-
 .../db/engine/memtable/IWritableMemChunkGroup.java |  30 +++
 .../db/engine/memtable/PrimitiveMemTable.java      |  17 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  12 +-
 .../db/engine/memtable/WritableMemChunkGroup.java  |  99 +++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    |  29 +--
 .../apache/iotdb/db/metadata/path/AlignedPath.java |   7 +-
 .../iotdb/db/metadata/path/MeasurementPath.java    |   8 +-
 .../apache/iotdb/db/metadata/path/PartialPath.java |   4 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |   6 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |  15 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  22 +-
 .../db/engine/memtable/MemTableTestUtils.java      |   6 +-
 .../db/engine/memtable/MemtableBenchmark.java      |   7 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |  14 +-
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  28 +++
 23 files changed, 481 insertions(+), 268 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 17deb09..1d0ae09 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1281,13 +1281,10 @@ public class IoTDBDescriptor {
                   + queryMemoryAllocateProportion);
         }
       }
-    }
 
-    conf.setMaxQueryDeduplicatedPathNum(
-        Integer.parseInt(
-            properties.getProperty(
-                "max_deduplicated_path_num",
-                Integer.toString(conf.getMaxQueryDeduplicatedPathNum()))));
+      conf.setMaxQueryDeduplicatedPathNum(
+          Integer.parseInt(properties.getProperty("max_deduplicated_path_num")));
+    }
   }
 
   @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 957d03e..ecbdac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -101,11 +102,11 @@ public class MemTableFlushTask {
     long sortTime = 0;
 
     // for map do not use get(key) to iterate
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
+    for (Map.Entry<String, IWritableMemChunkGroup> memTableEntry :
         memTable.getMemTableMap().entrySet()) {
       encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
 
-      final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+      final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 169c22f..8d80b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.flush;
 
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 /**
  * Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,16 +28,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 public class NotifyFlushMemTable extends AbstractMemTable {
 
   @Override
-  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
-    return null;
-  }
-
-  @Override
-  protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
-    return null;
-  }
-
-  @Override
   public IMemTable copy() {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 72c9b72..c5e8ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -22,20 +22,13 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,7 +40,7 @@ import java.util.Map.Entry;
 
 public abstract class AbstractMemTable implements IMemTable {
 
-  private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
+  private final Map<String, IWritableMemChunkGroup> memTableMap;
   /**
    * The initial value is true because we want calculate the text data size when recover memTable!!
    */
@@ -80,12 +73,12 @@ public abstract class AbstractMemTable implements IMemTable {
     this.memTableMap = new HashMap<>();
   }
 
-  public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+  public AbstractMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
     this.memTableMap = memTableMap;
   }
 
   @Override
-  public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
+  public Map<String, IWritableMemChunkGroup> getMemTableMap() {
     return memTableMap;
   }
 
@@ -93,63 +86,64 @@ public abstract class AbstractMemTable implements IMemTable {
    * create this MemChunk if it's not exist
    *
    * @param deviceId device id
-   * @param schema measurement schema
-   * @return this MemChunk
+   * @param schemaList measurement schemaList
+   * @return this MemChunkGroup
    */
-  private IWritableMemChunk createMemChunkIfNotExistAndGet(
-      String deviceId, IMeasurementSchema schema) {
-    Map<String, IWritableMemChunk> memSeries =
-        memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
-    return memSeries.computeIfAbsent(
-        schema.getMeasurementId(),
-        k -> {
-          seriesNumber++;
-          totalPointsNumThreshold += avgSeriesPointNumThreshold;
-          return genMemSeries(schema);
-        });
-  }
-
-  private IWritableMemChunk createAlignedMemChunkIfNotExistAndGet(
-      String deviceId, IMeasurementSchema schema) {
-    Map<String, IWritableMemChunk> memSeries =
-        memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
-    VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
-    return memSeries.computeIfAbsent(
-        vectorSchema.getMeasurementId(),
-        k -> {
-          seriesNumber++;
-          totalPointsNumThreshold +=
-              avgSeriesPointNumThreshold * vectorSchema.getSubMeasurementsCount();
-          return genAlignedMemSeries(vectorSchema);
-        });
+  private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
+      String deviceId, List<IMeasurementSchema> schemaList) {
+    IWritableMemChunkGroup memChunkGroup =
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new WritableMemChunkGroup(schemaList);
+            });
+    for (IMeasurementSchema schema : schemaList) {
+      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+        seriesNumber++;
+        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+      }
+    }
+    return memChunkGroup;
+  }
+
+  private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
+      String deviceId, List<IMeasurementSchema> schemaList) {
+    IWritableMemChunkGroup memChunkGroup =
+        memTableMap.computeIfAbsent(
+            deviceId,
+            k -> {
+              seriesNumber++;
+              totalPointsNumThreshold += avgSeriesPointNumThreshold;
+              return new AlignedWritableMemChunkGroup(schemaList);
+            });
+    for (IMeasurementSchema schema : schemaList) {
+      if (!memChunkGroup.contains(schema.getMeasurementId())) {
+        seriesNumber++;
+        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+      }
+    }
+    return memChunkGroup;
   }
 
-  protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
-
-  protected abstract IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema);
-
   @Override
   public void insert(InsertRowPlan insertRowPlan) {
     updatePlanIndexes(insertRowPlan.getIndex());
     Object[] values = insertRowPlan.getValues();
 
-    IMeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes();
-    for (int i = 0; i < measurementMNodes.length; i++) {
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
       if (values[i] == null) {
         continue;
       }
-      memSize +=
-          MemUtils.getRecordSize(
-              measurementMNodes[i].getSchema().getType(), values[i], disableMemControl);
-
-      write(
-          insertRowPlan.getDeviceId().getFullPath(),
-          measurementMNodes[i].getSchema(),
-          insertRowPlan.getTime(),
-          values[i]);
+      IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
+      schemaList.add(schema);
+      dataTypes.add(schema.getType());
     }
+    memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
+    write(insertRowPlan.getDeviceId().getFullPath(), schemaList, insertRowPlan.getTime(), values);
     totalPointsNum +=
         insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
   }
@@ -158,34 +152,24 @@ public abstract class AbstractMemTable implements IMemTable {
   public void insertAlignedRow(InsertRowPlan insertRowPlan) {
     updatePlanIndexes(insertRowPlan.getIndex());
     // write vector
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    List<TSEncoding> encodings = new ArrayList<>();
-    CompressionType compressionType = null;
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
-      if (insertRowPlan.getMeasurements()[i] == null) {
+      if (insertRowPlan.getValues()[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
-      measurements.add(schema.getMeasurementId());
-      types.add(schema.getType());
-      encodings.add(schema.getEncodingType());
-      compressionType = schema.getCompressor();
+      schemaList.add(schema);
+      dataTypes.add(schema.getType());
     }
-    if (measurements.isEmpty()) {
+    if (schemaList.isEmpty()) {
       return;
     }
-    VectorMeasurementSchema vectorSchema =
-        new VectorMeasurementSchema(
-            AlignedPath.VECTOR_PLACEHOLDER,
-            measurements.toArray(new String[measurements.size()]),
-            types.toArray(new TSDataType[measurements.size()]),
-            encodings.toArray(new TSEncoding[measurements.size()]),
-            compressionType);
-    memSize += MemUtils.getAlignedRecordSize(types, insertRowPlan.getValues(), disableMemControl);
+    memSize +=
+        MemUtils.getAlignedRecordsSize(dataTypes, insertRowPlan.getValues(), disableMemControl);
     writeAlignedRow(
         insertRowPlan.getDeviceId().getFullPath(),
-        vectorSchema,
+        schemaList,
         insertRowPlan.getTime(),
         insertRowPlan.getValues());
     totalPointsNum +=
@@ -224,92 +208,81 @@ public abstract class AbstractMemTable implements IMemTable {
 
   @Override
   public void write(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue) {
-    IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, schema);
-    memSeries.write(insertTime, objectValue);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+    IWritableMemChunkGroup memChunkGroup =
+        createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+    memChunkGroup.write(insertTime, objectValue, schemaList);
   }
 
   @Override
   public void writeAlignedRow(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue) {
-    IWritableMemChunk memSeries = createAlignedMemChunkIfNotExistAndGet(deviceId, schema);
-    memSeries.writeAlignedValue(insertTime, objectValue, schema);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+    IWritableMemChunkGroup memChunkGroup =
+        createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+    memChunkGroup.write(insertTime, objectValue, schemaList);
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
   public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
-      IWritableMemChunk memSeries =
-          createMemChunkIfNotExistAndGet(
-              insertTabletPlan.getDeviceId().getFullPath(),
-              insertTabletPlan.getMeasurementMNodes()[i].getSchema());
-      memSeries.write(
-          insertTabletPlan.getTimes(),
-          insertTabletPlan.getColumns()[i],
-          insertTabletPlan.getBitMaps() != null ? insertTabletPlan.getBitMaps()[i] : null,
-          insertTabletPlan.getDataTypes()[i],
-          start,
-          end);
+      IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
+      schemaList.add(schema);
     }
+    IWritableMemChunkGroup memChunkGroup =
+        createMemChunkGroupIfNotExistAndGet(
+            insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+    memChunkGroup.writeValues(
+        insertTabletPlan.getTimes(),
+        insertTabletPlan.getColumns(),
+        insertTabletPlan.getBitMaps(),
+        schemaList,
+        start,
+        end);
   }
 
+  @Override
   public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) {
-    updatePlanIndexes(insertTabletPlan.getIndex());
-    List<String> measurements = new ArrayList<>();
-    List<TSDataType> types = new ArrayList<>();
-    List<TSEncoding> encodings = new ArrayList<>();
-    CompressionType compressionType = null;
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
       IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
-      measurements.add(schema.getMeasurementId());
-      types.add(schema.getType());
-      encodings.add(schema.getEncodingType());
-      compressionType = schema.getCompressor();
+      schemaList.add(schema);
     }
-    if (measurements.isEmpty()) {
+    if (schemaList.isEmpty()) {
       return;
     }
-    VectorMeasurementSchema vectorSchema =
-        new VectorMeasurementSchema(
-            AlignedPath.VECTOR_PLACEHOLDER,
-            measurements.toArray(new String[measurements.size()]),
-            types.toArray(new TSDataType[measurements.size()]),
-            encodings.toArray(new TSEncoding[measurements.size()]),
-            compressionType);
-    IWritableMemChunk memSeries =
-        createAlignedMemChunkIfNotExistAndGet(
-            insertTabletPlan.getDeviceId().getFullPath(), vectorSchema);
-    memSeries.writeAlignedValues(
+    IWritableMemChunkGroup memChunkGroup =
+        createAlignedMemChunkGroupIfNotExistAndGet(
+            insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+    memChunkGroup.writeValues(
         insertTabletPlan.getTimes(),
         insertTabletPlan.getColumns(),
         insertTabletPlan.getBitMaps(),
-        vectorSchema,
+        schemaList,
         start,
         end);
   }
 
   @Override
   public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) {
-    Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
-    if (null == memSeries) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    if (null == memChunkGroup) {
       return true;
     }
-    return !memSeries.containsKey(measurement);
+    return !memChunkGroup.contains(measurement);
   }
 
   @Override
-  public int getCurrentChunkPointNum(String deviceId, String measurement) {
-    Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
-    IWritableMemChunk memChunk = memSeries.get(measurement);
-    return (int) memChunk.count();
+  public long getCurrentChunkPointNum(String deviceId, String measurement) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+    return memChunkGroup.getCurrentChunkPointNum(measurement);
   }
 
   @Override
@@ -325,10 +298,8 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public long size() {
     long sum = 0;
-    for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) {
-      for (IWritableMemChunk writableMemChunk : seriesMap.values()) {
-        sum += writableMemChunk.count();
-      }
+    for (IWritableMemChunkGroup writableMemChunkGroup : memTableMap.values()) {
+      sum += writableMemChunkGroup.count();
     }
     return sum;
   }
@@ -373,12 +344,13 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public void delete(
       PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
-    Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
-    if (deviceMap == null) {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(devicePath.getFullPath());
+    if (memChunkGroup == null) {
       return;
     }
 
-    Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator();
+    Iterator<Entry<String, IWritableMemChunk>> iter =
+        memChunkGroup.getMemChunkMap().entrySet().iterator();
     while (iter.hasNext()) {
       Entry<String, IWritableMemChunk> entry = iter.next();
       IWritableMemChunk chunk = entry.getValue();
@@ -433,13 +405,8 @@ public abstract class AbstractMemTable implements IMemTable {
 
   @Override
   public void release() {
-    for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) {
-      for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) {
-        TVList list = subEntry.getValue().getTVList();
-        if (list.getReferenceCount() == 0) {
-          TVListAllocator.getInstance().release(list);
-        }
-      }
+    for (Entry<String, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+      entry.getValue().release();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index cd7184b..69ebabb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -23,64 +23,71 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class AlignedWritableMemChunk implements IWritableMemChunk {
 
-  private final VectorMeasurementSchema schema;
+  private final Map<String, Integer> measurementIndexMap;
+  private final List<IMeasurementSchema> schemaList;
   private AlignedTVList list;
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
   private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class);
 
-  public AlignedWritableMemChunk(VectorMeasurementSchema schema) {
-    this.schema = schema;
-    this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
+  public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
+    this.measurementIndexMap = new LinkedHashMap<>();
+    List<TSDataType> dataTypeList = new ArrayList<>();
+    this.schemaList = schemaList;
+    for (int i = 0; i < schemaList.size(); i++) {
+      measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
+      dataTypeList.add(schemaList.get(i).getType());
+    }
+    this.list = TVListAllocator.getInstance().allocate(dataTypeList);
   }
 
   public boolean containsMeasurement(String measurementId) {
-    return schema.containsSubMeasurement(measurementId);
+    return measurementIndexMap.containsKey(measurementId);
   }
 
   @Override
   public void putLong(long t, long v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putInt(long t, int v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putFloat(long t, float v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putDouble(long t, double v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBinary(long t, Binary v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBoolean(long t, boolean v) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -90,32 +97,32 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
   public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -126,19 +133,20 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public void write(long insertTime, Object objectValue) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
-  public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
-    int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+  public void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
     putAlignedValue(insertTime, objectValue, columnIndexArray);
   }
 
   @Override
   public void write(
       long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
   }
 
   @Override
@@ -146,32 +154,29 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end) {
-    int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+    int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
     putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
   }
 
-  private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) {
-    VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
-    List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList();
-    List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList();
-    List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList();
-    for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) {
-      if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) {
-        this.schema.addMeasurement(
-            measurementIdsInInsertPlan.get(i),
-            dataTypesInInsertPlan.get(i),
-            encodingsInInsertPlan.get(i));
-        this.list.extendColumn(dataTypesInInsertPlan.get(i));
+  private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> schemaListInInsertPlan) {
+    List<String> measurementIdsInInsertPlan = new ArrayList<>();
+    for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
+      measurementIdsInInsertPlan.add(schemaListInInsertPlan.get(i).getMeasurementId());
+      if (!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
+        this.measurementIndexMap.put(
+            schemaListInInsertPlan.get(i).getMeasurementId(), measurementIndexMap.size());
+        this.schemaList.add(schemaListInInsertPlan.get(i));
+        this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
       }
     }
-    List<String> measurementIdsInTVList = this.schema.getSubMeasurementsList();
-    int[] columnIndexArray = new int[measurementIdsInTVList.size()];
-    for (int i = 0; i < columnIndexArray.length; i++) {
-      columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i));
-    }
+    int[] columnIndexArray = new int[measurementIndexMap.size()];
+    measurementIndexMap.forEach(
+        (measurementId, i) -> {
+          columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementId);
+        });
     return columnIndexArray;
   }
 
@@ -182,7 +187,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public long count() {
-    return (long) list.size() * schema.getSubMeasurementsCount();
+    return (long) list.size() * measurementIndexMap.size();
   }
 
   public long alignedListSize() {
@@ -191,7 +196,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public IMeasurementSchema getSchema() {
-    return schema;
+    return null;
   }
 
   @Override
@@ -209,7 +214,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
     list.increaseReferenceCount();
     List<Integer> columnIndexList = new ArrayList<>();
     for (IMeasurementSchema measurementSchema : schemaList) {
-      columnIndexList.add(schema.getSubMeasurementIndex(measurementSchema.getMeasurementId()));
+      columnIndexList.add(measurementIndexMap.get(measurementSchema.getMeasurementId()));
     }
     return list.getTvListByColumnIndex(columnIndexList);
   }
@@ -243,7 +248,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public IChunkWriter createIChunkWriter() {
-    return new AlignedChunkWriterImpl(schema);
+    return new AlignedChunkWriterImpl(schemaList);
   }
 
   @Override
@@ -310,4 +315,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
       timeDuplicateAlignedRowIndexList = null;
     }
   }
+
+  @Override
+  public void release() {
+    if (list.getReferenceCount() == 0) {
+      TVListAllocator.getInstance().release(list);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
new file mode 100644
index 0000000..a15fb6f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
+
+  private AlignedWritableMemChunk memChunk;
+
+  public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+    memChunk = new AlignedWritableMemChunk(schemaList);
+  }
+
+  @Override
+  public void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end) {
+    memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end);
+  }
+
+  @Override
+  public void release() {
+    memChunk.release();
+  }
+
+  @Override
+  public long count() {
+    return memChunk.count();
+  }
+
+  @Override
+  public boolean contains(String measurement) {
+    return memChunk.containsMeasurement(measurement);
+  }
+
+  @Override
+  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    memChunk.writeAlignedValue(insertTime, objectValue, schemaList);
+  }
+
+  @Override
+  public Map<String, IWritableMemChunk> getMemChunkMap() {
+    return Collections.singletonMap("", memChunk);
+  }
+
+  @Override
+  public long getCurrentChunkPointNum(String measurement) {
+    return memChunk.count();
+  }
+
+  public AlignedWritableMemChunk getAlignedMemChunk() {
+    return memChunk;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1dc2446..e05b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -42,12 +42,13 @@ import java.util.Map;
  */
 public interface IMemTable {
 
-  Map<String, Map<String, IWritableMemChunk>> getMemTableMap();
+  Map<String, IWritableMemChunkGroup> getMemTableMap();
 
-  void write(String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue);
+  void write(
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
 
   void writeAlignedRow(
-      String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue);
+      String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
   /**
    * write data in the range [start, end). Null value in each column values will be replaced by the
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
@@ -145,7 +146,7 @@ public interface IMemTable {
   boolean checkIfChunkDoesNotExist(String deviceId, String measurement);
 
   /** only used when mem control enabled */
-  int getCurrentChunkPointNum(String deviceId, String measurement);
+  long getCurrentChunkPointNum(String deviceId, String measurement);
 
   /** only used when mem control enabled */
   void addTextDataSize(long textDataIncrement);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index ee10ea1..ce8b6f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -60,7 +60,8 @@ public interface IWritableMemChunk {
 
   void write(long insertTime, Object objectValue);
 
-  void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema);
+  void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
 
   /**
    * write data in the range [start, end). Null value in the valueList will be replaced by the
@@ -73,7 +74,7 @@ public interface IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end);
 
@@ -129,4 +130,6 @@ public interface IWritableMemChunk {
   IChunkWriter createIChunkWriter();
 
   void encode(IChunkWriter chunkWriter);
+
+  void release();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
new file mode 100644
index 0000000..ffbd65c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWritableMemChunkGroup {
+
+  void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end);
+
+  void release();
+
+  long count();
+
+  boolean contains(String measurement);
+
+  void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
+
+  Map<String, IWritableMemChunk> getMemChunkMap();
+
+  long getCurrentChunkPointNum(String measurement);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 6915967..425fc21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.engine.memtable;
 
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,23 +30,13 @@ public class PrimitiveMemTable extends AbstractMemTable {
     this.disableMemControl = !enableMemControl;
   }
 
-  public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+  public PrimitiveMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
     super(memTableMap);
   }
 
   @Override
-  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
-    return new WritableMemChunk(schema);
-  }
-
-  @Override
-  protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
-    return new AlignedWritableMemChunk((VectorMeasurementSchema) schema);
-  }
-
-  @Override
   public IMemTable copy() {
-    Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
+    Map<String, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap());
 
     return new PrimitiveMemTable(newMap);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3b096ad..51c7749 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -72,7 +72,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
+  public void writeAlignedValue(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
   }
 
@@ -114,7 +115,7 @@ public class WritableMemChunk implements IWritableMemChunk {
       long[] times,
       Object[] valueList,
       BitMap[] bitMaps,
-      IMeasurementSchema schema,
+      List<IMeasurementSchema> schemaList,
       int start,
       int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
@@ -346,4 +347,11 @@ public class WritableMemChunk implements IWritableMemChunk {
       }
     }
   }
+
+  @Override
+  public void release() {
+    //        if (list.getReferenceCount() == 0) {
+    //          TVListAllocator.getInstance().release(list);
+    //        }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
new file mode 100644
index 0000000..5845da3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -0,0 +1,99 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WritableMemChunkGroup implements IWritableMemChunkGroup {
+
+  private Map<String, IWritableMemChunk> memChunkMap;
+
+  public WritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+    memChunkMap = new HashMap<>();
+    for (IMeasurementSchema schema : schemaList) {
+      createMemChunkIfNotExistAndGet(schema);
+    }
+  }
+
+  @Override
+  public void writeValues(
+      long[] times,
+      Object[] columns,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end) {
+    int emptyColumnCount = 0;
+    for (int i = 0; i < columns.length; i++) {
+      if (columns[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+      memChunk.write(
+          times,
+          columns[i],
+          bitMaps == null ? null : bitMaps[i],
+          schemaList.get(i - emptyColumnCount).getType(),
+          start,
+          end);
+    }
+  }
+
+  private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) {
+    return memChunkMap.computeIfAbsent(
+        schema.getMeasurementId(),
+        k -> {
+          return new WritableMemChunk(schema);
+        });
+  }
+
+  @Override
+  public void release() {
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      memChunk.release();
+    }
+  }
+
+  @Override
+  public long count() {
+    long count = 0;
+    for (IWritableMemChunk memChunk : memChunkMap.values()) {
+      count += memChunk.count();
+    }
+    return count;
+  }
+
+  @Override
+  public boolean contains(String measurement) {
+    return memChunkMap.containsKey(measurement);
+  }
+
+  @Override
+  public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+    int emptyColumnCount = 0;
+    for (int i = 0; i < objectValue.length; i++) {
+      if (objectValue[i] == null) {
+        emptyColumnCount++;
+        continue;
+      }
+      IWritableMemChunk memChunk =
+          createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+      memChunk.write(insertTime, objectValue[i]);
+    }
+  }
+
+  @Override
+  public Map<String, IWritableMemChunk> getMemChunkMap() {
+    return memChunkMap;
+  }
+
+  @Override
+  public long getCurrentChunkPointNum(String measurement) {
+    return memChunkMap.get(measurement).count();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 93d020f..5908597 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
@@ -355,7 +356,7 @@ public class TsFileProcessor {
         memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
       } else {
         // here currentChunkPointNum >= 1
-        int currentChunkPointNum =
+        long currentChunkPointNum =
             workMemTable.getCurrentChunkPointNum(deviceId, insertRowPlan.getMeasurements()[i]);
         memTableIncrement +=
             (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
@@ -378,7 +379,7 @@ public class TsFileProcessor {
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
-    AlignedWritableMemChunk vectorMemChunk = null;
+    AlignedWritableMemChunk alignedMemChunk = null;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
     if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
       // ChunkMetadataIncrement
@@ -388,15 +389,15 @@ public class TsFileProcessor {
       memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes());
     } else {
       // here currentChunkPointNum >= 1
-      int currentChunkPointNum =
+      long currentChunkPointNum =
           workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
       memTableIncrement +=
           (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
               ? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes())
               : 0;
-      vectorMemChunk =
-          ((AlignedWritableMemChunk)
-              workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+      alignedMemChunk =
+          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+              .getAlignedMemChunk();
     }
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
       // skip failed Measurements
@@ -404,10 +405,10 @@ public class TsFileProcessor {
         continue;
       }
       // extending the column of aligned mem chunk
-      if (vectorMemChunk != null
-          && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+      if (alignedMemChunk != null
+          && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
         memTableIncrement +=
-            (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+            (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
                 * insertRowPlan.getDataTypes()[i].getDataTypeSize();
       }
       // TEXT data mem size
@@ -486,13 +487,13 @@ public class TsFileProcessor {
           ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
               * TVList.tvListArrayMemSize(dataType);
     } else {
-      int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
+      long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
                 * TVList.tvListArrayMemSize(dataType);
       } else {
-        int acquireArray =
+        long acquireArray =
             (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
                 / PrimitiveArrayManager.ARRAY_SIZE;
         memIncrements[0] +=
@@ -526,7 +527,7 @@ public class TsFileProcessor {
               * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
     } else {
       int currentChunkPointNum =
-          workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
+          (int) workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -541,8 +542,8 @@ public class TsFileProcessor {
                 : acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
       }
       vectorMemChunk =
-          ((AlignedWritableMemChunk)
-              workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+              .getAlignedMemChunk();
     }
     for (int i = 0; i < dataTypes.length; i++) {
       TSDataType dataType = dataTypes[i];
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index ff51b18..f85338d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -20,7 +20,8 @@
 package org.apache.iotdb.db.metadata.path;
 
 import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
@@ -334,14 +335,14 @@ public class AlignedPath extends PartialPath {
 
   @Override
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     // check If memtable contains this path
     if (!memTableMap.containsKey(getDevice())) {
       return null;
     }
     AlignedWritableMemChunk alignedMemChunk =
-        ((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER));
+        ((AlignedWritableMemChunkGroup) memTableMap.get(getDevice())).getAlignedMemChunk();
     boolean containsMeasurement = false;
     for (String measurement : measurementList) {
       if (alignedMemChunk.containsMeasurement(measurement)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index d8aef64..d6900de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.path;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -246,14 +247,15 @@ public class MeasurementPath extends PartialPath {
 
   @Override
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     // check If Memtable Contains this path
     if (!memTableMap.containsKey(getDevice())
-        || !memTableMap.get(getDevice()).containsKey(getMeasurement())) {
+        || !memTableMap.get(getDevice()).contains(getMeasurement())) {
       return null;
     }
-    IWritableMemChunk memChunk = memTableMap.get(getDevice()).get(getMeasurement());
+    IWritableMemChunk memChunk =
+        memTableMap.get(getDevice()).getMemChunkMap().get(getMeasurement());
     // get sorted tv list is synchronized so different query can get right sorted list reference
     TVList chunkCopy = memChunk.getSortedTvListForQuery();
     int curSize = chunkCopy.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 90581d6..ae42486 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.metadata.path;
 
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -423,7 +423,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
    * @return ReadOnlyMemChunk
    */
   public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
-      Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+      Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
       throws QueryProcessException, IOException {
     throw new UnsupportedOperationException("Should call exact sub class!");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 5fb024b..c29ebc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -241,11 +241,7 @@ public class InsertRowPlan extends InsertPlan {
           }
           continue;
         }
-        if (isAligned) {
-          dataTypes[i] = measurementMNodes[i].getSchema().getSubMeasurementsTSDataTypeList().get(i);
-        } else {
-          dataTypes[i] = measurementMNodes[i].getSchema().getType();
-        }
+        dataTypes[i] = measurementMNodes[i].getSchema().getType();
         try {
           values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
         } catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index e81a2ab..10617cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -56,10 +56,23 @@ public class MemUtils {
   }
 
   /**
+   * function for getting the value size. If mem control enabled, do not add text data size here,
+   * the size will be added to memtable before inserting.
+   */
+  public static long getRecordsSize(
+      List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+    long memSize = 0L;
+    for (int i = 0; i < dataTypes.size(); i++) {
+      memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize);
+    }
+    return memSize;
+  }
+
+  /**
    * function for getting the vector value size. If mem control enabled, do not add text data size
    * here, the size will be added to memtable before inserting.
    */
-  public static long getAlignedRecordSize(
+  public static long getAlignedRecordsSize(
       List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
     // time and index size
     long memSize = 8L + 4L;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 7ded716..836fdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.writelog.recover;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
 import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -126,11 +127,11 @@ public class LogReplayer {
       }
     }
 
-    Map<String, Map<String, IWritableMemChunk>> memTableMap = recoverMemTable.getMemTableMap();
-    for (Map.Entry<String, Map<String, IWritableMemChunk>> deviceEntry : memTableMap.entrySet()) {
+    Map<String, IWritableMemChunkGroup> memTableMap = recoverMemTable.getMemTableMap();
+    for (Map.Entry<String, IWritableMemChunkGroup> deviceEntry : memTableMap.entrySet()) {
       String deviceId = deviceEntry.getKey();
       for (Map.Entry<String, IWritableMemChunk> measurementEntry :
-          deviceEntry.getValue().entrySet()) {
+          deviceEntry.getValue().getMemChunkMap().entrySet()) {
         WritableMemChunk memChunk = (WritableMemChunk) measurementEntry.getValue();
         currentTsFileResource.updateStartTime(deviceId, memChunk.getFirstPoint());
         currentTsFileResource.updateEndTime(deviceId, memChunk.getLastPoint());
@@ -194,10 +195,19 @@ public class LogReplayer {
     // mark failed plan manually
     checkDataTypeAndMarkFailed(mNodes, plan);
     if (plan instanceof InsertRowPlan) {
-      recoverMemTable.insert((InsertRowPlan) plan);
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedRow((InsertRowPlan) plan);
+      } else {
+        recoverMemTable.insert((InsertRowPlan) plan);
+      }
     } else {
-      recoverMemTable.insertTablet(
-          (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      if (plan.isAligned()) {
+        recoverMemTable.insertAlignedTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      } else {
+        recoverMemTable.insertTablet(
+            (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index 09b21c6..00a6fd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class MemTableTestUtils {
@@ -62,9 +63,10 @@ public class MemTableTestUtils {
     for (long l = startTime; l <= endTime; l++) {
       iMemTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN)),
           l,
-          (int) l);
+          new Object[] {(int) l});
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
index c11cd90..086a647 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
+import java.util.Collections;
+
 /** Memtable insert benchmark. Bench the Memtable and get its performance. */
 public class MemtableBenchmark {
 
@@ -46,9 +48,10 @@ public class MemtableBenchmark {
       for (int j = 0; j < numOfMeasurement; j++) {
         memTable.write(
             deviceId,
-            new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN),
+            Collections.singletonList(
+                new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN)),
             System.nanoTime(),
-            String.valueOf(System.currentTimeMillis()));
+            new Object[] {String.valueOf(System.currentTimeMillis())});
       }
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index a1c925f..6be17a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -119,16 +119,18 @@ public class PrimitiveMemTableTest {
     for (int i = 0; i < dataSize; i++) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
           dataSize - i - 1,
-          i + 10);
+          new Object[] {i + 10});
     }
     for (int i = 0; i < dataSize; i++) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+          Collections.singletonList(
+              new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
           i,
-          i);
+          new Object[] {i});
     }
     MeasurementPath fullPath =
         new MeasurementPath(
@@ -163,9 +165,9 @@ public class PrimitiveMemTableTest {
     for (TimeValuePair aRet : ret) {
       memTable.write(
           deviceId,
-          new UnaryMeasurementSchema(sensorId, dataType, encoding),
+          Collections.singletonList(new UnaryMeasurementSchema(sensorId, dataType, encoding)),
           aRet.getTimestamp(),
-          aRet.getValue().getValue());
+          new Object[] {aRet.getValue().getValue()});
     }
     MeasurementPath fullPath =
         new MeasurementPath(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 4c977ca..81999ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.tsfile.write.chunk;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -63,6 +66,31 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
     this.valueIndex = 0;
   }
 
+  public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
+    TSEncoding timeEncoding =
+        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+    TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+    timeChunkWriter =
+        new TimeChunkWriter(
+            "",
+            schemaList.get(0).getCompressor(),
+            timeEncoding,
+            TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType));
+
+    valueChunkWriterList = new ArrayList<>(schemaList.size());
+    for (int i = 0; i < schemaList.size(); i++) {
+      valueChunkWriterList.add(
+          new ValueChunkWriter(
+              schemaList.get(i).getMeasurementId(),
+              schemaList.get(i).getCompressor(),
+              schemaList.get(i).getType(),
+              schemaList.get(i).getEncodingType(),
+              schemaList.get(i).getValueEncoder()));
+    }
+
+    this.valueIndex = 0;
+  }
+
   public void write(long time, int value, boolean isNull) {
     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
   }

[iotdb] 02/02: Merge branch 'master' of https://github.com/apache/iotdb into IWritableMemChunkGroup

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch IWritableMemChunkGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0acc2923b4dfe8844e9a5baca94b0d5411f6cbba
Merge: 0cd21e1 96df186
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 17 11:21:00 2021 +0800

    Merge branch 'master' of https://github.com/apache/iotdb into IWritableMemChunkGroup

 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |   8 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  70 +++++--
 .../iotdb/cluster/coordinator/Coordinator.java     |   8 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  21 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |  10 +-
 .../log/snapshot/MetaSimpleSnapshotTest.java       |   2 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   2 +-
 .../cluster/utils/CreateTemplatePlanUtil.java      |   2 +-
 docs/UserGuide/Appendix/SQL-Reference.md           |  89 +++++++--
 .../Data-Concept/Data-Model-and-Terminology.md     |  32 +--
 ...{Measurement-Template.md => Schema-Template.md} |  20 +-
 .../DDL-Data-Definition-Language.md                |  75 +++++--
 docs/zh/UserGuide/Appendix/SQL-Reference.md        |  87 ++++++--
 .../Data-Concept/Data-Model-and-Terminology.md     |  30 ++-
 ...{Measurement-Template.md => Schema-Template.md} |   2 +-
 .../DDL-Data-Definition-Language.md                |  72 +++++--
 .../iotdb/AlignedTimeseriesSessionExample.java     |   9 +-
 .../dropwizard/DropwizardMetricManager.java        |   4 +-
 .../dropwizard/DropwizardMetricManagerTest.java    |  26 +--
 .../org/apache/iotdb/metrics/MetricManager.java    |   4 +-
 .../iotdb/metrics/impl/DoNothingMetricManager.java |   4 +-
 .../micrometer/MicrometerMetricManager.java        |  13 +-
 .../reporter/MicrometerPrometheusReporter.java     |   2 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  56 +++---
 .../iotdb/db/metadata/logfile/MLogTxtWriter.java   |  19 +-
 .../iotdb/db/metadata/logfile/MLogWriter.java      |  18 +-
 .../org/apache/iotdb/db/metadata/mtree/MTree.java  |   6 +-
 .../iotdb/db/metadata/template/Template.java       |   2 +-
 .../db/metadata/template/TemplateManager.java      |   6 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  10 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  51 +++--
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   6 +-
 .../iotdb/db/qp/logical/crud/InsertOperator.java   |  21 +-
 .../qp/logical/sys/ActivateTemplateOperator.java   |  51 +++++
 .../sys/CreateAlignedTimeSeriesOperator.java       | 131 ++++++++++++
 .../db/qp/logical/sys/CreateTemplateOperator.java  | 122 ++++++++++++
 .../db/qp/logical/sys/SetTemplateOperator.java     |  60 ++++++
 .../db/qp/logical/sys/UnsetTemplateOperator.java   |  60 ++++++
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  24 +--
 ...TemplatePlan.java => ActivateTemplatePlan.java} |  16 +-
 .../physical/{crud => sys}/AppendTemplatePlan.java |   2 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  35 ++--
 .../physical/{crud => sys}/CreateTemplatePlan.java |   2 +-
 .../physical/{crud => sys}/PruneTemplatePlan.java  |   2 +-
 .../SetTemplatePlan.java}                          |  16 +-
 .../UnsetTemplatePlan.java}                        |  16 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 219 +++++++++++++++++++--
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  22 ++-
 .../org/apache/iotdb/db/tools/mlog/MLogParser.java |  21 +-
 .../IoTDBCreateAlignedTimeseriesIT.java            | 106 ++++++++++
 .../db/integration/IoTDBSchemaTemplateIT.java      | 177 +++++++++++++++++
 .../iotdb/db/metadata/MManagerBasicTest.java       |  82 ++++----
 .../org/apache/iotdb/db/metadata/TemplateTest.java |   9 +-
 .../iotdb/db/metadata/mlog/MLogUpgraderTest.java   |   2 +-
 .../iotdb/db/qp/physical/InsertRowPlanTest.java    |   6 +-
 .../db/qp/physical/PhysicalPlanSerializeTest.java  |   2 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |  17 +-
 .../java/org/apache/iotdb/session/Session.java     |   9 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |   2 +-
 .../session/IoTDBSessionVectorABDeviceIT.java      |   4 +-
 .../IoTDBSessionVectorAggregationWithUnSeqIT.java  |   4 +-
 .../apache/iotdb/session/template/TemplateUT.java  |   2 +-
 thrift/rpc-changelist.md                           |   4 +-
 thrift/src/main/thrift/rpc.thrift                  |   2 +-
 64 files changed, 1586 insertions(+), 428 deletions(-)