You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:20 UTC

[iotdb] branch vectorMemTable created (now 49b19a4)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 49b19a4  fix conflicts

This branch includes the following new commits:

     new eb42aed  memtable
     new 56d92eb  merge Vector
     new 27883e0  Memtable
     new 49b19a4  fix conflicts

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 04/04: fix conflicts

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 49b19a4915f85180db1d825377892d6b13f2357e
Merge: 27883e0 20ecbc7
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 17:34:30 2021 +0800

    fix conflicts

 .../java/org/apache/iotdb/cluster/ClientMain.java  |  5 +-
 .../cluster/client/sync/SyncClientAdaptor.java     |  6 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   | 14 ++---
 .../apache/iotdb/cluster/metadata/MetaPuller.java  | 15 ++---
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  6 +-
 .../caller/PullMeasurementSchemaHandler.java       |  7 ++-
 .../cluster/client/sync/SyncClientAdaptorTest.java |  5 +-
 .../org/apache/iotdb/cluster/common/IoTDBTest.java |  4 +-
 .../org/apache/iotdb/cluster/common/TestUtils.java |  9 +--
 .../caller/PullMeasurementSchemaHandlerTest.java   | 10 ++--
 .../cluster/server/member/DataGroupMemberTest.java |  3 +-
 .../cluster/server/member/MetaGroupMemberTest.java | 10 ++--
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  5 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  3 +-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  3 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |  1 -
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  1 -
 .../db/engine/merge/manage/MergeResource.java      | 12 ++--
 .../db/engine/merge/task/MergeMultiChunkTask.java  |  6 +-
 .../iotdb/db/engine/merge/task/MergeTask.java      |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  4 +-
 .../apache/iotdb/db/metadata/MLogTxtWriter.java    |  4 +-
 .../org/apache/iotdb/db/metadata/MManager.java     | 13 +++--
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  8 +--
 .../apache/iotdb/db/metadata/MeasurementMeta.java  | 14 ++---
 .../iotdb/db/metadata/mnode/MeasurementMNode.java  | 11 ++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  8 +--
 .../db/qp/physical/sys/MeasurementMNodePlan.java   |  9 +--
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  | 29 +++++-----
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |  3 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |  3 +-
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |  3 +-
 .../java/org/apache/iotdb/session/Session.java     |  6 +-
 .../iotdb/spark/tsfile/NarrowConverter.scala       |  4 +-
 .../apache/iotdb/spark/tsfile/WideConverter.scala  |  5 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 13 +++--
 .../apache/iotdb/tsfile/write/TsFileWriter.java    | 13 +++--
 .../tsfile/write/chunk/ChunkGroupWriterImpl.java   |  3 +-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  8 +--
 .../tsfile/write/chunk/IChunkGroupWriter.java      |  4 +-
 .../apache/iotdb/tsfile/write/page/PageWriter.java |  6 +-
 .../tsfile/write/schema/IMeasurementSchema.java    | 15 +++++
 .../tsfile/write/schema/MeasurementSchema.java     | 67 ++++++++++++----------
 .../apache/iotdb/tsfile/write/schema/Schema.java   | 25 ++++----
 .../write/writer/RestorableTsFileIOWriter.java     |  8 +--
 .../org/apache/iotdb/tsfile/utils/RecordUtils.java |  4 +-
 .../tsfile/write/DefaultDeviceTemplateTest.java    |  3 +-
 .../write/schema/converter/SchemaBuilderTest.java  | 17 +++---
 48 files changed, 236 insertions(+), 203 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index dd18de4,3538aa1..f22070e
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@@ -186,20 -157,10 +185,20 @@@ public abstract class AbstractMemTable 
    public void write(
        String deviceId,
        String measurement,
-       MeasurementSchema schema,
+       IMeasurementSchema schema,
        long insertTime,
        Object objectValue) {
 -    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema);
 +    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
 +    memSeries.write(insertTime, objectValue);
 +  }
 +
 +  @Override
 +  public void write(
 +      String deviceId,
 +      IMeasurementSchema schema,
 +      long insertTime,
 +      Object objectValue) {
 +    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
      memSeries.write(insertTime, objectValue);
    }
  
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 3b2368d,e035787..db073ca
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@@ -325,34 -359,4 +359,5 @@@ public class MeasurementSchem
    public void setType(TSDataType type) {
      this.type = type.serialize();
    }
 +
-   @Override
-   public TSEncoding getTimeTSEncoding() {
-     // TODO Auto-generated method stub
-     return null;
-   }
- 
-   @Override
-   public List<String> getValueMeasurementIdList() {
-     // TODO Auto-generated method stub
-     return null;
-   }
- 
-   @Override
-   public List<TSDataType> getValueTSDataTypeList() {
-     // TODO Auto-generated method stub
-     return null;
-   }
- 
-   @Override
-   public List<TSEncoding> getValueTSEncodingList() {
-     // TODO Auto-generated method stub
-     return null;
-   }
- 
-   @Override
-   public List<Encoder> getValueEncoderList() {
-     // TODO Auto-generated method stub
-     return null;
-   }
  }


[iotdb] 03/04: Memtable

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 27883e0c9485f503fc721037440b3c9ce1b6e41d
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 17:07:09 2021 +0800

    Memtable
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  3 +-
 .../iotdb/db/engine/flush/NotifyFlushMemTable.java |  4 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 48 +++++++++++++++++++---
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  7 ++++
 .../db/engine/memtable/IWritableMemChunk.java      |  6 ++-
 .../db/engine/memtable/PrimitiveMemTable.java      |  3 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java | 15 +++++--
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |  3 ++
 .../tsfile/write/schema/MeasurementSchema.java     | 34 ++++++++++++++-
 9 files changed, 108 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index c1bc5a9..4134d1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
@@ -110,7 +111,7 @@ public class MemTableFlushTask {
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
-        MeasurementSchema desc = series.getSchema();
+        IMeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.put(new Pair<>(tvList, desc));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 5ba50d0..4aeb66c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.flush;
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 /**
  * Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public class NotifyFlushMemTable extends AbstractMemTable {
 
   @Override
-  protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 21c9bf2..dd18de4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
@@ -93,12 +94,12 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   private IWritableMemChunk createIfNotExistAndGet(
-      String deviceId, String measurement, MeasurementSchema schema) {
+      String deviceId, IMeasurementSchema schema) {
     Map<String, IWritableMemChunk> memSeries =
         memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
 
     return memSeries.computeIfAbsent(
-        measurement,
+        schema.getMeasurementId(),
         k -> {
           seriesNumber++;
           totalPointsNumThreshold += avgSeriesPointNumThreshold;
@@ -106,7 +107,7 @@ public abstract class AbstractMemTable implements IMemTable {
         });
   }
 
-  protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema);
+  protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
 
   @Override
   public void insert(InsertRowPlan insertRowPlan) {
@@ -138,6 +139,34 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   @Override
+  public void insert(InsertVectorPlan insertVectorPlan) {
+    updatePlanIndexes(insertVectorPlan.getIndex());
+    Object[] values = insertVectorPlan.getValues();
+
+    MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes();
+    String[] measurements = insertVectorPlan.getMeasurements();
+    IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema();
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      if (value == null) {
+        continue;
+      }
+
+      memSize +=
+          MemUtils.getRecordSize(
+              vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl);
+    }
+    write(
+        insertVectorPlan.getDeviceId().getFullPath(),
+        vmSchema,
+        insertVectorPlan.getTime(),
+        values);
+
+    totalPointsNum +=
+        insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber();
+  }
+
+  @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
       throws WriteProcessException {
     updatePlanIndexes(insertTabletPlan.getIndex());
@@ -160,7 +189,17 @@ public abstract class AbstractMemTable implements IMemTable {
       MeasurementSchema schema,
       long insertTime,
       Object objectValue) {
-    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema);
+    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
+    memSeries.write(insertTime, objectValue);
+  }
+
+  @Override
+  public void write(
+      String deviceId,
+      IMeasurementSchema schema,
+      long insertTime,
+      Object objectValue) {
+    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
     memSeries.write(insertTime, objectValue);
   }
 
@@ -174,7 +213,6 @@ public abstract class AbstractMemTable implements IMemTable {
       IWritableMemChunk memSeries =
           createIfNotExistAndGet(
               insertTabletPlan.getDeviceId().getFullPath(),
-              insertTabletPlan.getMeasurements()[i],
               insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       memSeries.write(
           insertTabletPlan.getTimes(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce412a2..3a46e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
@@ -53,6 +54,12 @@ public interface IMemTable {
       long insertTime,
       Object objectValue);
 
+  void write(
+      String deviceId,
+      IMeasurementSchema schema,
+      long insertTime,
+      Object objectValue);
+
   void write(InsertTabletPlan insertTabletPlan, int start, int end);
 
   /** @return the number of points */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index bdb4bbc..10298e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 public interface IWritableMemChunk {
 
@@ -36,6 +36,8 @@ public interface IWritableMemChunk {
   void putBinary(long t, Binary v);
 
   void putBoolean(long t, boolean v);
+  
+  void putVector(long t, byte[] v);
 
   void putLongs(long[] t, long[] v, int start, int end);
 
@@ -56,7 +58,7 @@ public interface IWritableMemChunk {
 
   long count();
 
-  MeasurementSchema getSchema();
+  IMeasurementSchema getSchema();
 
   /**
    * served for query requests.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 254d722..0336c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.HashMap;
@@ -38,7 +39,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
   }
 
   @Override
-  protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
     return new WritableMemChunk(schema, TVListAllocator.getInstance().allocate(schema.getType()));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4536896..8cf3422 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -22,14 +22,15 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 public class WritableMemChunk implements IWritableMemChunk {
 
-  private MeasurementSchema schema;
+  private IMeasurementSchema schema;
   private TVList list;
 
-  public WritableMemChunk(MeasurementSchema schema, TVList list) {
+  public WritableMemChunk(IMeasurementSchema schema, TVList list) {
     this.schema = schema;
     this.list = list;
   }
@@ -55,6 +56,9 @@ public class WritableMemChunk implements IWritableMemChunk {
       case TEXT:
         putBinary(insertTime, (Binary) objectValue);
         break;
+      case VECTOR:
+        putVector(insertTime, (byte[]) objectValue);
+        break;
       default:
         throw new UnSupportedDataTypeException("Unsupported data type:" + schema.getType());
     }
@@ -123,6 +127,11 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
+  public void putVector(long t, byte[] v) {
+    list.putVector(t, v);
+  }
+
+  @Override
   public void putLongs(long[] t, long[] v, int start, int end) {
     list.putLongs(t, v, start, end);
   }
@@ -188,7 +197,7 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public MeasurementSchema getSchema() {
+  public IMeasurementSchema getSchema() {
     return schema;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 8236e15..791af95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -124,6 +124,9 @@ public class ReadOnlyMemChunk {
           case DOUBLE:
             statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
             break;
+          case VECTOR:
+            statsByType.update(timeValuePair.getTimestamp());
+            break;
           default:
             throw new QueryProcessException("Unsupported data type:" + dataType);
         }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 34ee496..3b2368d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -42,7 +43,8 @@ import java.util.Objects;
  * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
  * TSDataTypeConverter up to now.
  */
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
+public class MeasurementSchema implements Comparable<MeasurementSchema>,
+    Serializable, IMeasurementSchema {
 
   public static final MeasurementSchema TIME_SCHEMA =
       new MeasurementSchema(
@@ -323,4 +325,34 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public void setType(TSDataType type) {
     this.type = type.serialize();
   }
+
+  @Override
+  public TSEncoding getTimeTSEncoding() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<String> getValueMeasurementIdList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getValueTSDataTypeList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<TSEncoding> getValueTSEncodingList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<Encoder> getValueEncoderList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }


[iotdb] 02/04: merge Vector

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56d92eb416866d86d8217d5a2f0de1b425b8237f
Merge: eb42aed 9c72690
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 14:05:44 2021 +0800

    merge Vector

 .../iotdb/cluster/client/DataClientProvider.java   |   4 +-
 .../cluster/client/async/AsyncClientPool.java      |  34 ++-
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  59 +++--
 .../iotdb/cluster/client/sync/SyncDataClient.java  |   9 +-
 .../iotdb/cluster/client/sync/SyncMetaClient.java  |  10 +-
 .../exception/BadSeedUrlFormatException.java       |   3 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  78 ++----
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  14 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  37 +--
 .../cluster/query/aggregate/ClusterAggregator.java |  14 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |  14 +-
 .../query/groupby/RemoteGroupByExecutor.java       |  27 +-
 .../query/last/ClusterLastQueryExecutor.java       |  14 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  14 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  12 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |  11 +-
 .../cluster/server/PullSnapshotHintService.java    |   9 +-
 .../cluster/server/service/BaseSyncService.java    |  15 +-
 .../cluster/server/service/DataSyncService.java    |   7 +-
 .../cluster/server/service/MetaSyncService.java    |   5 +-
 .../cluster/client/DataClientProviderTest.java     | 136 ++++++++++
 .../cluster/client/sync/SyncClientPoolTest.java    |  17 +-
 .../cluster/client/sync/SyncDataClientTest.java    |  51 ++++
 .../cluster/client/sync/SyncMetaClientTest.java    |  47 ++++
 server/src/assembly/resources/conf/iotdb-env.bat   |   5 +
 server/src/assembly/resources/conf/iotdb-env.sh    |   3 +
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  12 +-
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  12 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  27 +-
 .../iotdb/tsfile/file/header/ChunkHeader.java      |  15 +-
 .../file/metadata/statistics/Statistics.java       |  94 ++-----
 .../file/metadata/statistics/TimeStatistics.java   | 161 ++++++++++++
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |   4 +
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |  18 +-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  |  24 +-
 .../iotdb/tsfile/write/chunk/IChunkWriter.java     |  15 +-
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  | 255 ++++++++++++++++++
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 252 ++++++++++++++++++
 .../tsfile/write/chunk/VectorChunkWriterImpl.java  | 202 ++++++++++++++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |   1 +
 .../iotdb/tsfile/write/page/TimePageWriter.java    | 177 +++++++++++++
 .../page/{PageWriter.java => ValuePageWriter.java} | 160 ++++++-----
 .../write/record/datapoint/BooleanDataPoint.java   |   2 +-
 .../write/record/datapoint/DoubleDataPoint.java    |   2 +-
 .../write/record/datapoint/FloatDataPoint.java     |   2 +-
 .../write/record/datapoint/IntDataPoint.java       |   2 +-
 .../write/record/datapoint/LongDataPoint.java      |   2 +-
 .../write/record/datapoint/StringDataPoint.java    |   2 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |  34 ++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  22 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   3 +-
 .../write/writer/RestorableTsFileIOWriterTest.java |   5 +-
 .../tsfile/write/writer/TestTsFileOutput.java      |  70 +++++
 .../tsfile/write/writer/TimeChunkWriterTest.java   | 111 ++++++++
 .../tsfile/write/writer/TimePageWriterTest.java    | 171 ++++++++++++
 .../tsfile/write/writer/ValueChunkWriterTest.java  | 109 ++++++++
 .../tsfile/write/writer/ValuePageWriterTest.java   | 291 +++++++++++++++++++++
 .../write/writer/VectorChunkWriterImplTest.java    | 178 +++++++++++++
 .../write/writer/VectorMeasurementSchemaStub.java  |  80 ++++++
 59 files changed, 2722 insertions(+), 442 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index dcd82ab,caa0893..c1bc5a9
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@@ -173,29 -173,23 +173,29 @@@ public class MemTableFlushTask 
  
              switch (dataType) {
                case BOOLEAN:
-                 seriesWriterImpl.write(time, tvPairs.getBoolean(i));
+                 seriesWriterImpl.write(time, tvPairs.getBoolean(i), false);
                  break;
                case INT32:
-                 seriesWriterImpl.write(time, tvPairs.getInt(i));
+                 seriesWriterImpl.write(time, tvPairs.getInt(i), false);
                  break;
                case INT64:
-                 seriesWriterImpl.write(time, tvPairs.getLong(i));
+                 seriesWriterImpl.write(time, tvPairs.getLong(i), false);
                  break;
                case FLOAT:
-                 seriesWriterImpl.write(time, tvPairs.getFloat(i));
+                 seriesWriterImpl.write(time, tvPairs.getFloat(i), false);
                  break;
                case DOUBLE:
-                 seriesWriterImpl.write(time, tvPairs.getDouble(i));
+                 seriesWriterImpl.write(time, tvPairs.getDouble(i), false);
                  break;
                case TEXT:
-                 seriesWriterImpl.write(time, tvPairs.getBinary(i));
+                 seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
                  break;
 +              case VECTOR:
 +                // TODO: 
 +//                for ( : tvPairs.getVector(i)) {
 +//                  seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
 +//                }
 +                break;
                default:
                  LOGGER.error(
                      "Storage group {} does not support data type: {}", storageGroup, dataType);
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 1ec2001,1c1ba7f..759a751
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@@ -77,6 -77,8 +77,8 @@@ public abstract class Statistics<T> 
          return new DoubleStatistics();
        case FLOAT:
          return new FloatStatistics();
 -      case Vector:
++      case VECTOR:
+         return new TimeStatistics();
        default:
          throw new UnknownColumnTypeException(type.toString());
      }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
index 0000000,74bd701..e812166
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/TimeStatistics.java
@@@ -1,0 -1,161 +1,161 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.file.metadata.statistics;
+ 
+ import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.ByteBuffer;
+ 
+ public class TimeStatistics extends Statistics {
+ 
+   static final int TIME_STATISTICS_FIXED_RAM_SIZE = 40;
+ 
+   @Override
+   public TSDataType getType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   @Override
+   public int getStatsSize() {
+     return 0;
+   }
+ 
+   @Override
+   public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+     throw new StatisticsClassException("Time statistics does not support: set min max from bytes");
+   }
+ 
+   @Override
+   public Long getMinValue() {
+     throw new StatisticsClassException("Time statistics does not support: min value");
+   }
+ 
+   @Override
+   public Long getMaxValue() {
+     throw new StatisticsClassException("Time statistics does not support: max value");
+   }
+ 
+   @Override
+   public Long getFirstValue() {
+     throw new StatisticsClassException("Time statistics does not support: first value");
+   }
+ 
+   @Override
+   public Long getLastValue() {
+     throw new StatisticsClassException("Time statistics does not support: last value");
+   }
+ 
+   @Override
+   public double getSumDoubleValue() {
+     throw new StatisticsClassException("Time statistics does not support: double sum");
+   }
+ 
+   @Override
+   public long getSumLongValue() {
+     throw new StatisticsClassException("Time statistics does not support: long sum");
+   }
+ 
+   @Override
+   void updateStats(long value) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   void updateStats(long[] values, int batchSize) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   public void updateStats(long minValue, long maxValue) {
+     throw new StatisticsClassException("Time statistics does not support: update stats");
+   }
+ 
+   @Override
+   public long calculateRamSize() {
+     return TIME_STATISTICS_FIXED_RAM_SIZE;
+   }
+ 
+   @Override
+   protected void mergeStatisticsValue(Statistics stats) {}
+ 
+   @Override
+   public byte[] getMinValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+   }
+ 
+   @Override
+   public byte[] getMaxValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get max value bytes");
+   }
+ 
+   @Override
+   public byte[] getFirstValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get first value bytes");
+   }
+ 
+   @Override
+   public byte[] getLastValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get last value bytes");
+   }
+ 
+   @Override
+   public byte[] getSumValueBytes() {
+     throw new StatisticsClassException("Time statistics does not support: get sum value bytes");
+   }
+ 
+   @Override
+   public ByteBuffer getMinValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get min value bytes");
+   }
+ 
+   @Override
+   public ByteBuffer getMaxValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get max value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getFirstValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get first value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getLastValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get last value buffer");
+   }
+ 
+   @Override
+   public ByteBuffer getSumValueBuffer() {
+     throw new StatisticsClassException("Time statistics does not support: get sum value buffer");
+   }
+ 
+   @Override
+   public int serializeStats(OutputStream outputStream) {
+     return 0;
+   }
+ 
+   @Override
+   public void deserialize(InputStream inputStream) throws IOException {}
+ 
+   @Override
+   public void deserialize(ByteBuffer byteBuffer) {}
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 0000000,df9ded9..522eff5
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@@ -1,0 -1,255 +1,255 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.chunk;
+ 
+ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+ import org.apache.iotdb.tsfile.file.header.PageHeader;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.IOException;
+ 
+ public class TimeChunkWriter {
+ 
+   private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
+ 
+   private final String measurementId;
+ 
+   private final TSEncoding encodingType;
+ 
+   private final CompressionType compressionType;
+ 
+   /** all pages of this chunk. */
+   private final PublicBAOS pageBuffer;
+ 
+   private int numOfPages;
+ 
+   /** write data into current page */
+   private TimePageWriter pageWriter;
+ 
+   /** page size threshold. */
+   private final long pageSizeThreshold;
+ 
+   private final int maxNumberOfPointsInPage;
+ 
+   /** value count in current page. */
+   private int valueCountInOnePageForNextCheck;
+ 
+   // initial value for valueCountInOnePageForNextCheck
+   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
+ 
+   /** statistic of this chunk. */
+   private TimeStatistics statistics;
+ 
+   /** first page info */
+   private int sizeWithoutStatistic;
+ 
+   private Statistics<?> firstPageStatistics;
+ 
+   public TimeChunkWriter(
+       String measurementId,
+       CompressionType compressionType,
+       TSEncoding encodingType,
+       Encoder timeEncoder) {
+     this.measurementId = measurementId;
+     this.encodingType = encodingType;
+     this.compressionType = compressionType;
+     this.pageBuffer = new PublicBAOS();
+ 
+     this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+     this.maxNumberOfPointsInPage =
+         TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+     // initial check of memory usage. So that we have enough data to make an initial prediction
+     this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ 
+     // init statistics for this chunk and page
+     this.statistics = new TimeStatistics();
+ 
+     this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
+   }
+ 
+   public void write(long time) {
+     pageWriter.write(time);
+   }
+ 
+   public void write(long[] timestamps, int batchSize) {
+     pageWriter.write(timestamps, batchSize);
+   }
+ 
+   /**
+    * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+    * to pageBuffer
+    */
+   public boolean checkPageSizeAndMayOpenANewPage() {
+     if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
+       logger.debug("current line count reaches the upper bound, write page {}", measurementId);
+       return true;
+     } else if (pageWriter.getPointNumber()
+         >= valueCountInOnePageForNextCheck) { // need to check memory size
+       // not checking the memory used for every value
+       long currentPageSize = pageWriter.estimateMaxMemSize();
+       if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
+         // we will write the current page
+         logger.debug(
+             "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
+             measurementId,
+             pageSizeThreshold,
+             currentPageSize,
+             pageWriter.getPointNumber());
+         valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+         return true;
+       } else {
+         // reset the valueCountInOnePageForNextCheck for the next page
+         valueCountInOnePageForNextCheck =
+             (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
+       }
+     }
+     return false;
+   }
+ 
+   public void writePageToPageBuffer() {
+     try {
+       if (numOfPages == 0) { // record the firstPageStatistics
+         this.firstPageStatistics = pageWriter.getStatistics();
+         this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
+       } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+         byte[] b = pageBuffer.toByteArray();
+         pageBuffer.reset();
+         pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+         firstPageStatistics.serialize(pageBuffer);
+         pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+         pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+         firstPageStatistics = null;
+       } else {
+         pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
+       }
+ 
+       // update statistics of this chunk
+       numOfPages++;
+       this.statistics.mergeStatistics(pageWriter.getStatistics());
+     } catch (IOException e) {
+       logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
+     } finally {
+       // clear start time stamp for next initializing
+       pageWriter.reset();
+     }
+   }
+ 
+   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+     sealCurrentPage();
+     writeAllPagesOfChunkToTsFile(tsfileWriter);
+ 
+     // reinit this chunk writer
+     pageBuffer.reset();
+     numOfPages = 0;
+     firstPageStatistics = null;
+     this.statistics = new TimeStatistics();
+   }
+ 
+   public long estimateMaxSeriesMemSize() {
+     return pageBuffer.size()
+         + pageWriter.estimateMaxMemSize()
+         + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
+         + pageWriter.getStatistics().getSerializedSize();
+   }
+ 
+   public long getCurrentChunkSize() {
+     if (pageBuffer.size() == 0) {
+       return 0;
+     }
+     // return the serialized size of the chunk header + all pages
+     return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+         + (long) pageBuffer.size();
+   }
+ 
+   public void sealCurrentPage() {
+     if (pageWriter != null && pageWriter.getPointNumber() > 0) {
+       writePageToPageBuffer();
+     }
+   }
+ 
+   public void clearPageWriter() {
+     pageWriter = null;
+   }
+ 
+   public int getNumOfPages() {
+     return numOfPages;
+   }
+ 
+   public TSDataType getDataType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   /**
+    * write the page to specified IOWriter.
+    *
+    * @param writer the specified IOWriter
+    * @throws IOException exception in IO
+    */
+   public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
+     if (statistics.getCount() == 0) {
+       return;
+     }
+ 
+     // start to write this column chunk
+     writer.startFlushChunk(
+         measurementId,
+         compressionType,
 -        TSDataType.Vector,
++        TSDataType.VECTOR,
+         encodingType,
+         statistics,
+         pageBuffer.size(),
+         numOfPages,
+         0x80);
+ 
+     long dataOffset = writer.getPos();
+ 
+     // write all pages of this column
+     writer.writeBytesToStream(pageBuffer);
+ 
+     int dataSize = (int) (writer.getPos() - dataOffset);
+     if (dataSize != pageBuffer.size()) {
+       throw new IOException(
+           "Bytes written is inconsistent with the size of data: "
+               + dataSize
+               + " !="
+               + " "
+               + pageBuffer.size());
+     }
+ 
+     writer.endCurrentChunk();
+   }
+ 
+   /** only used for test */
+   public PublicBAOS getPageBuffer() {
+     return pageBuffer;
+   }
+ }
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
index 0000000,af71ecd..8f1e907
mode 000000,100644..100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/VectorChunkWriterImpl.java
@@@ -1,0 -1,202 +1,202 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.chunk;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.Binary;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ 
+ public class VectorChunkWriterImpl implements IChunkWriter {
+ 
+   private final TimeChunkWriter timeChunkWriter;
+   private final List<ValueChunkWriter> valueChunkWriterList;
+   private int valueIndex;
+ 
+   /** @param schema schema of this measurement */
+   public VectorChunkWriterImpl(IMeasurementSchema schema) {
+     timeChunkWriter =
+         new TimeChunkWriter(
+             schema.getMeasurementId(),
+             schema.getCompressor(),
+             schema.getTimeTSEncoding(),
+             schema.getTimeEncoder());
+ 
+     List<String> valueMeasurementIdList = schema.getValueMeasurementIdList();
+     List<TSDataType> valueTSDataTypeList = schema.getValueTSDataTypeList();
+     List<TSEncoding> valueTSEncodingList = schema.getValueTSEncodingList();
+     List<Encoder> valueEncoderList = schema.getValueEncoderList();
+ 
+     valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
+     for (int i = 0; i < valueMeasurementIdList.size(); i++) {
+       valueChunkWriterList.add(
+           new ValueChunkWriter(
+               valueMeasurementIdList.get(i),
+               schema.getCompressor(),
+               valueTSDataTypeList.get(i),
+               valueTSEncodingList.get(i),
+               valueEncoderList.get(i)));
+     }
+ 
+     this.valueIndex = 0;
+   }
+ 
+   @Override
+   public void write(long time, int value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, long value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, boolean value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, float value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, double value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time, Binary value, boolean isNull) {
+     valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
+   }
+ 
+   @Override
+   public void write(long time) {
+     valueIndex = 0;
+     timeChunkWriter.write(time);
+     if (checkPageSizeAndMayOpenANewPage()) {
+       writePageToPageBuffer();
+     }
+   }
+ 
+   // TODO tsfile write interface
+   @Override
+   public void write(long[] timestamps, int[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, long[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, boolean[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, float[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, double[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void write(long[] timestamps, Binary[] values, int batchSize) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
+    * to pageBuffer
+    */
+   private boolean checkPageSizeAndMayOpenANewPage() {
+     return timeChunkWriter.checkPageSizeAndMayOpenANewPage();
+   }
+ 
+   private void writePageToPageBuffer() {
+     timeChunkWriter.writePageToPageBuffer();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.writePageToPageBuffer();
+     }
+   }
+ 
+   @Override
+   public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
+     timeChunkWriter.writeToFileWriter(tsfileWriter);
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.writeToFileWriter(tsfileWriter);
+     }
+   }
+ 
+   @Override
+   public long estimateMaxSeriesMemSize() {
+     long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
+     }
+     return estimateMaxSeriesMemSize;
+   }
+ 
+   @Override
+   public long getCurrentChunkSize() {
+     long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       currentChunkSize += valueChunkWriter.getCurrentChunkSize();
+     }
+     return currentChunkSize;
+   }
+ 
+   @Override
+   public void sealCurrentPage() {
+     timeChunkWriter.sealCurrentPage();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.sealCurrentPage();
+     }
+   }
+ 
+   @Override
+   public void clearPageWriter() {
+     timeChunkWriter.clearPageWriter();
+     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+       valueChunkWriter.clearPageWriter();
+     }
+   }
+ 
+   @Override
+   public int getNumOfPages() {
+     return timeChunkWriter.getNumOfPages();
+   }
+ 
+   @Override
+   public TSDataType getDataType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
index 0000000,9968815..bdca8d5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimeChunkWriterTest.java
@@@ -1,0 -1,111 +1,111 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.fail;
+ 
+ public class TimeChunkWriterTest {
+ 
+   @Test
+   public void testWrite1() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     TimeChunkWriter chunkWriter =
+         new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+     for (long time = 1; time <= 10; time++) {
+       chunkWriter.write(time);
+     }
+     assertFalse(chunkWriter.checkPageSizeAndMayOpenANewPage());
+     chunkWriter.sealCurrentPage();
+     // page without statistics size: 82 + chunk header size: 8
+     assertEquals(90L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       assertEquals(
+           (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(82, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(82, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWrite2() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     TimeChunkWriter chunkWriter =
+         new TimeChunkWriter("c1", CompressionType.UNCOMPRESSED, TSEncoding.PLAIN, timeEncoder);
+     for (long time = 1; time <= 10; time++) {
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     for (long time = 11; time <= 20; time++) {
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     assertEquals(2, chunkWriter.getNumOfPages());
+     // two pages with statistics size: (82 + 17) * 2 + chunk header size: 9
+     assertEquals(207L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeAllPagesOfChunkToTsFile(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("c1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(198, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(198, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
index 0000000,2ec6294..cab975c
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TimePageWriterTest.java
@@@ -1,0 -1,171 +1,171 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.compress.ICompressor;
+ import org.apache.iotdb.tsfile.compress.IUnCompressor;
+ import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.page.TimePageWriter;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ public class TimePageWriterTest {
+ 
+   @Test
+   public void testWrite() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     try {
+       pageWriter.write(1L);
+       assertEquals(8, pageWriter.estimateMaxMemSize());
+       ByteBuffer buffer1 = pageWriter.getUncompressedBytes();
+       ByteBuffer buffer = ByteBuffer.wrap(buffer1.array());
+       pageWriter.reset();
+       assertEquals(0, pageWriter.estimateMaxMemSize());
+       byte[] timeBytes = new byte[8];
+       buffer.get(timeBytes);
+       ByteBuffer buffer2 = ByteBuffer.wrap(timeBytes);
+       PlainDecoder decoder = new PlainDecoder();
+       assertEquals(1L, decoder.readLong(buffer2));
+       decoder.reset();
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithoutCompress1() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // without page statistics
+       assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+       // total size
+       assertEquals(26, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       // compressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithoutCompress2() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.UNCOMPRESSED);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // with page statistics
+       assertEquals(0, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, false));
+       // total size
+       assertEquals(43, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       // compressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       TimeStatistics testStatistics =
 -          (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.Vector);
++          (TimeStatistics) TimeStatistics.deserialize(buffer, TSDataType.VECTOR);
+       assertEquals(1L, testStatistics.getStartTime());
+       assertEquals(3L, testStatistics.getEndTime());
+       assertEquals(3, testStatistics.getCount());
+       assertEquals(1L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(buffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(buffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWritePageHeaderAndDataIntoBuffWithSnappy() {
+     Encoder timeEncoder = new PlainEncoder(TSDataType.INT64, 0);
+     ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+     TimePageWriter pageWriter = new TimePageWriter(timeEncoder, compressor);
+     PublicBAOS publicBAOS = new PublicBAOS();
+     try {
+       pageWriter.write(1L);
+       pageWriter.write(2L);
+       pageWriter.write(3L);
+       // without page statistics
+       assertEquals(2, pageWriter.writePageHeaderAndDataIntoBuff(publicBAOS, true));
+ 
+       // total size
+       assertEquals(22, publicBAOS.size());
+       TimeStatistics statistics = pageWriter.getStatistics();
+       assertEquals(1L, statistics.getStartTime());
+       assertEquals(3L, statistics.getEndTime());
+       assertEquals(3, statistics.getCount());
+       ByteBuffer compressedBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // uncompressedSize
+       assertEquals(24, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+       // compressedSize
+       assertEquals(20, ReadWriteForEncodingUtils.readUnsignedVarInt(compressedBuffer));
+       byte[] compress = new byte[20];
+       compressedBuffer.get(compress);
+       byte[] uncompress = new byte[24];
+       IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+       unCompressor.uncompress(compress, 0, 20, uncompress, 0);
+       ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompress);
+       assertEquals(1L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+       assertEquals(2L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+       assertEquals(3L, ReadWriteIOUtils.readLong(uncompressedBuffer));
+     } catch (IOException e) {
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
index 0000000,93e18bb..3ca81b1
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorChunkWriterImplTest.java
@@@ -1,0 -1,178 +1,178 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.file.MetaMarker;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.utils.PublicBAOS;
+ import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+ import org.apache.iotdb.tsfile.write.chunk.VectorChunkWriterImpl;
+ 
+ import org.junit.Test;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.fail;
+ 
+ public class VectorChunkWriterImplTest {
+ 
+   @Test
+   public void testWrite1() {
+     VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+     VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+ 
+     for (int time = 1; time <= 20; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+ 
+     chunkWriter.sealCurrentPage();
+     // time chunk: 14 + 4 + 160; value chunk 1: 8 + 2 + 4 + 3 + 80; value chunk 2: 8 + 2 + 4 + 3 +
+     // 20; value chunk 3: 9 + 4 + 7 + 20 * 8;
+     assertEquals(492L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeToFileWriter(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // time chunk
+       assertEquals(
+           (byte) (0x80 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(164, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 164);
+ 
+       // value chunk 1
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(89, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 89);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(29, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 29);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(171, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(171, buffer.remaining());
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ 
+   @Test
+   public void testWrite2() {
+     VectorMeasurementSchemaStub measurementSchema = new VectorMeasurementSchemaStub();
+     VectorChunkWriterImpl chunkWriter = new VectorChunkWriterImpl(measurementSchema);
+ 
+     for (int time = 1; time <= 20; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+     for (int time = 21; time <= 40; time++) {
+       chunkWriter.write(time, (float) time, false);
+       chunkWriter.write(time, time, false);
+       chunkWriter.write(time, (double) time, false);
+       chunkWriter.write(time);
+     }
+     chunkWriter.sealCurrentPage();
+ 
+     // time chunk: 14 + (4 + 17 + 160) * 2
+     // value chunk 1: 9 + (2 + 41 + 4 + 3 + 80) * 2
+     // value chunk 2: 9 + (2 + 41 + 4 + 3 + 20) * 2
+     // value chunk 3: 9 + (4 + 57 + 4 + 3 + 160) * 2
+     assertEquals(1259L, chunkWriter.getCurrentChunkSize());
+ 
+     try {
+       TestTsFileOutput testTsFileOutput = new TestTsFileOutput();
+       TsFileIOWriter writer = new TsFileIOWriter(testTsFileOutput, true);
+       chunkWriter.writeToFileWriter(writer);
+       PublicBAOS publicBAOS = testTsFileOutput.publicBAOS;
+       ByteBuffer buffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+       // time chunk
+       assertEquals((byte) (0x80 | MetaMarker.CHUNK_HEADER), ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1.time", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(362, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
 -      assertEquals(TSDataType.Vector.serialize(), ReadWriteIOUtils.readByte(buffer));
++      assertEquals(TSDataType.VECTOR.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 362);
+ 
+       // value chunk 1
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s1", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(260, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.FLOAT.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 260);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s2", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(140, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.INT32.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       buffer.position(buffer.position() + 140);
+ 
+       // value chunk 2
+       assertEquals(0x40 | MetaMarker.CHUNK_HEADER, ReadWriteIOUtils.readByte(buffer));
+       assertEquals("s3", ReadWriteIOUtils.readVarIntString(buffer));
+       assertEquals(456, ReadWriteForEncodingUtils.readUnsignedVarInt(buffer));
+       assertEquals(TSDataType.DOUBLE.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(CompressionType.UNCOMPRESSED.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(TSEncoding.PLAIN.serialize(), ReadWriteIOUtils.readByte(buffer));
+       assertEquals(456, buffer.remaining());
+ 
+     } catch (IOException e) {
+       e.printStackTrace();
+       fail();
+     }
+   }
+ }
diff --cc tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
index 0000000,795a0a6..40335f5
mode 000000,100644..100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java
@@@ -1,0 -1,80 +1,80 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.iotdb.tsfile.write.writer;
+ 
+ import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+ 
+ import java.util.Arrays;
+ import java.util.List;
+ 
+ public class VectorMeasurementSchemaStub implements IMeasurementSchema {
+ 
+   @Override
+   public String getMeasurementId() {
+     return "s1.time";
+   }
+ 
+   @Override
+   public CompressionType getCompressor() {
+     return CompressionType.UNCOMPRESSED;
+   }
+ 
+   @Override
+   public TSDataType getType() {
 -    return TSDataType.Vector;
++    return TSDataType.VECTOR;
+   }
+ 
+   @Override
+   public TSEncoding getTimeTSEncoding() {
+     return TSEncoding.PLAIN;
+   }
+ 
+   @Override
+   public Encoder getTimeEncoder() {
+     return new PlainEncoder(TSDataType.INT64, 0);
+   }
+ 
+   @Override
+   public List<String> getValueMeasurementIdList() {
+     return Arrays.asList("s1", "s2", "s3");
+   }
+ 
+   @Override
+   public List<TSDataType> getValueTSDataTypeList() {
+     return Arrays.asList(TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE);
+   }
+ 
+   @Override
+   public List<TSEncoding> getValueTSEncodingList() {
+     return Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN);
+   }
+ 
+   @Override
+   public List<Encoder> getValueEncoderList() {
+     return Arrays.asList(
+         new PlainEncoder(TSDataType.FLOAT, 0),
+         new PlainEncoder(TSDataType.INT32, 0),
+         new PlainEncoder(TSDataType.DOUBLE, 0));
+   }
+ }


[iotdb] 01/04: memtable

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb42aede25ab35a0a43d9e9ad3d14aa7a124765f
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 13:55:58 2021 +0800

    memtable
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   6 +
 .../iotdb/db/rescon/PrimitiveArrayManager.java     |   9 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  14 ++
 .../iotdb/db/utils/datastructure/VectorTVList.java | 230 +++++++++++++++++++++
 .../db/utils/datastructure/VectorTVListTest.java   |  67 ++++++
 .../tsfile/file/metadata/enums/TSDataType.java     |   6 +-
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |  62 ++++++
 7 files changed, 392 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..dcd82ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -190,6 +190,12 @@ public class MemTableFlushTask {
               case TEXT:
                 seriesWriterImpl.write(time, tvPairs.getBinary(i));
                 break;
+              case VECTOR:
+                // TODO: 
+//                for ( : tvPairs.getVector(i)) {
+//                  seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
+//                }
+                break;
               default:
                 LOGGER.error(
                     "Storage group {} does not support data type: {}", storageGroup, dataType);
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index aa6c264..b471332 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
-
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +71,7 @@ public class PrimitiveArrayManager {
     bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
     bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
     bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>());
   }
 
   private PrimitiveArrayManager() {
@@ -127,6 +128,9 @@ public class PrimitiveArrayManager {
       case TEXT:
         dataArray = new Binary[ARRAY_SIZE];
         break;
+      case VECTOR:
+        dataArray = new byte[ARRAY_SIZE][];
+        break;
       default:
         throw new UnSupportedDataTypeException(dataType.toString());
     }
@@ -205,6 +209,9 @@ public class PrimitiveArrayManager {
     } else if (dataArray instanceof Binary[]) {
       Arrays.fill((Binary[]) dataArray, null);
       dataType = TSDataType.TEXT;
+    } else if (dataArray instanceof TsPrimitiveType[][]) {
+      Arrays.fill((TsPrimitiveType[][]) dataArray, null);
+      dataType = TSDataType.VECTOR;
     } else {
       throw new UnSupportedDataTypeException("Unknown data array type");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..3e0ef74 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -73,6 +73,8 @@ public abstract class TVList {
         return new DoubleTVList();
       case BOOLEAN:
         return new BooleanTVList();
+      case VECTOR:
+        return new VectorTVList();
       default:
         break;
     }
@@ -137,6 +139,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public void putVector(long time, byte[] value) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public void putLongs(long[] time, long[] value, int start, int end) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -161,6 +167,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public void putVectors(long[] time, byte[][] value, int start, int end) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public long getLong(int index) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -185,6 +195,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public byte[] getVector(int index) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public abstract void sort();
 
   public long getMinTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
new file mode 100644
index 0000000..e0a652a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
+
+public class VectorTVList extends TVList {
+
+  private List<Object[]> values;
+
+  private byte[][][] sortedValues;
+
+  private byte[] pivotValue;
+
+  VectorTVList() {
+    super();
+    values = new ArrayList<>();
+  }
+
+  @Override
+  public void putVector(long timestamp, byte[] value) {
+    checkExpansion();
+    int arrayIndex = size / ARRAY_SIZE;
+    int elementIndex = size % ARRAY_SIZE;
+    minTime = Math.min(minTime, timestamp);
+    timestamps.get(arrayIndex)[elementIndex] = timestamp;
+    values.get(arrayIndex)[elementIndex] = value;
+    size++;
+    if (sorted && size > 1 && timestamp < getTime(size - 2)) {
+      sorted = false;
+    }
+  }
+
+  @Override
+  public byte[] getVector(int index) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    return (byte[]) values.get(arrayIndex)[elementIndex];
+  }
+
+  protected void set(int index, long timestamp, byte[] value) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    timestamps.get(arrayIndex)[elementIndex] = timestamp;
+    values.get(arrayIndex)[elementIndex] = value;
+  }
+
+  @Override
+  public VectorTVList clone() {
+    VectorTVList cloneList = new VectorTVList();
+    cloneAs(cloneList);
+    for (Object[] valueArray : values) {
+      cloneList.values.add(cloneValue(valueArray));
+    }
+    return cloneList;
+  }
+
+  private TsPrimitiveType[][] cloneValue(Object[] valueArray) {
+    TsPrimitiveType[][] cloneArray = (TsPrimitiveType[][])new Object[valueArray.length];
+    System.arraycopy(valueArray, 0, cloneArray, 0, valueArray.length);
+    return cloneArray;
+  }
+
+  @Override
+  public void sort() {
+    if (sortedTimestamps == null || sortedTimestamps.length < size) {
+      sortedTimestamps =
+          (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, size);
+    }
+    if (sortedValues == null || sortedValues.length < size) {
+      sortedValues =
+          (byte[][][])
+              PrimitiveArrayManager.createDataListsByType(TSDataType.VECTOR, size);
+    }
+    sort(0, size);
+    clearSortedValue();
+    clearSortedTime();
+    sorted = true;
+  }
+
+  @Override
+  void clearValue() {
+    if (values != null) {
+      for (Object[] dataArray : values) {
+        PrimitiveArrayManager.release(dataArray);
+      }
+      values.clear();
+    }
+  }
+
+  @Override
+  void clearSortedValue() {
+    if (sortedValues != null) {
+      sortedValues = null;
+    }
+  }
+
+  @Override
+  protected void setFromSorted(int src, int dest) {
+    set(
+        dest,
+        sortedTimestamps[src / ARRAY_SIZE][src % ARRAY_SIZE],
+        sortedValues[src / ARRAY_SIZE][src % ARRAY_SIZE]);
+  }
+
+  @Override
+  protected void set(int src, int dest) {
+    long srcT = getTime(src);
+    byte[] srcV = getVector(src);
+    set(dest, srcT, srcV);
+  }
+
+  @Override
+  protected void setToSorted(int src, int dest) {
+    sortedTimestamps[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getTime(src);
+    sortedValues[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getVector(src);
+  }
+
+  @Override
+  protected void reverseRange(int lo, int hi) {
+    hi--;
+    while (lo < hi) {
+      long loT = getTime(lo);
+      byte[] loV = getVector(lo);
+      long hiT = getTime(hi);
+      byte[] hiV = getVector(hi);
+      set(lo++, hiT, hiV);
+      set(hi--, loT, loV);
+    }
+  }
+
+  @Override
+  protected void expandValues() {
+    values.add((Object[]) getPrimitiveArraysByType(TSDataType.VECTOR));
+  }
+
+  @Override
+  protected void saveAsPivot(int pos) {
+    pivotTime = getTime(pos);
+    pivotValue = getVector(pos);
+  }
+
+  @Override
+  protected void setPivotTo(int pos) {
+    set(pos, pivotTime, pivotValue);
+  }
+
+  @Override
+  public TimeValuePair getTimeValuePair(int index) {
+    return new TimeValuePair(
+        getTime(index), TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index)));
+  }
+
+  @Override
+  protected TimeValuePair getTimeValuePair(
+      int index, long time, Integer floatPrecision, TSEncoding encoding) {
+    return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index)));
+  }
+
+  @Override
+  protected void releaseLastValueArray() {
+    PrimitiveArrayManager.release(values.remove(values.size() - 1));
+  }
+
+  @Override
+  public void putVectors(long[] time, byte[][] value, int start, int end) {
+    checkExpansion();
+    int idx = start;
+
+    updateMinTimeAndSorted(time, start, end);
+
+    while (idx < end) {
+      int inputRemaining = end - idx;
+      int arrayIdx = size / ARRAY_SIZE;
+      int elementIdx = size % ARRAY_SIZE;
+      int internalRemaining = ARRAY_SIZE - elementIdx;
+      if (internalRemaining >= inputRemaining) {
+        // the remaining inputs can fit the last array, copy all remaining inputs into last array
+        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining);
+        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining);
+        size += inputRemaining;
+        break;
+      } else {
+        // the remaining inputs cannot fit the last array, fill the last array and create a new
+        // one and enter the next loop
+        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
+        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
+        idx += internalRemaining;
+        size += internalRemaining;
+        checkExpansion();
+      }
+    }
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.VECTOR;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
new file mode 100644
index 0000000..7095612
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VectorTVListTest {
+
+  @Test
+  public void testVectorTVList() {
+    VectorTVList tvList = new VectorTVList();
+    for (int i = 0; i < 1000; i++) {
+      byte[] value = new byte[4 * 5];
+      byte[] bytes = new byte[4];
+      for (int j = 0; j < 20; j++) {
+        if (j % 4 == 0) {
+          bytes = BytesUtils.intToBytes(i);
+        }
+        value[j] = bytes[j % 4];
+      }
+      tvList.putVector(i, value);
+    }
+    for (int i = 0; i < tvList.size; i++) {
+      Assert.assertEquals(String.valueOf(i), tvList.getVector(i).toString());
+      Assert.assertEquals(i, tvList.getTime(i));
+    }
+  }
+
+  @Test
+  public void testVectorTVLists() {
+    VectorTVList tvList = new VectorTVList();
+    byte[][] vectorList = new byte[1001][4 * 5];
+    List<Long> timeList = new ArrayList<>();
+    for (int i = 1000; i >= 0; i--) {
+      timeList.add((long) i);
+      for (int j = 0; j < 5; j++) {
+        vectorList[i][j] = 0;
+      }
+    }
+    tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorList, 0, 1000);
+    for (long i = 0; i < tvList.size; i++) {
+      Assert.assertEquals(tvList.size - i, tvList.getTime((int) i));
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 0edda45..02a2086 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -41,7 +41,10 @@ public enum TSDataType {
   DOUBLE((byte) 4),
 
   /** TEXT */
-  TEXT((byte) 5);
+  TEXT((byte) 5),
+
+  /** VECTOR */
+  VECTOR((byte) 6);
 
   private final byte type;
 
@@ -96,6 +99,7 @@ public enum TSDataType {
       case TEXT:
       case INT64:
       case DOUBLE:
+      case VECTOR:
         return 8;
       default:
         throw new UnSupportedDataTypeException(this.toString());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 73b01d2..32c621f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -45,11 +45,23 @@ public abstract class TsPrimitiveType implements Serializable {
         return new TsPrimitiveType.TsDouble((double) v);
       case TEXT:
         return new TsPrimitiveType.TsBinary((Binary) v);
+      case VECTOR:
+        return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v);
       default:
         throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
     }
   }
 
+  public void setVector(TsPrimitiveType[] val) {
+    // TODO Auto-generated method stub
+
+  }
+
+  public TsPrimitiveType[] getVector() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
   public boolean getBoolean() {
     throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class");
   }
@@ -462,4 +474,54 @@ public abstract class TsPrimitiveType implements Serializable {
       return false;
     }
   }
+
+  public static class TsVector extends TsPrimitiveType {
+
+    private TsPrimitiveType[] value;
+
+    public TsVector(TsPrimitiveType[] value) {
+      this.value = value;
+    }
+
+    @Override
+    public TsPrimitiveType[] getVector() {
+      return value;
+    }
+
+    @Override
+    public void setVector(TsPrimitiveType[] val) {
+      this.value = val;
+    }
+
+    @Override
+    public int getSize() {
+      int size = 0;
+      for (TsPrimitiveType type : value) {
+        size += type.getSize();
+      }
+      // object header + array object header
+      return 4 + 4 + size;
+    }
+
+    @Override
+    public Object getValue() {
+      return getVector();
+    }
+
+    @Override
+    public String getStringValue() {
+      StringBuilder builder = new StringBuilder("[");
+      builder.append(value[0].getStringValue());
+      for (TsPrimitiveType type : value) {
+        builder.append(", ").append(type.getStringValue());
+      }
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    public TSDataType getDataType() {
+      return TSDataType.VECTOR;
+    }
+  }
 }