You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:23 UTC

[iotdb] 03/04: Memtable

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 27883e0c9485f503fc721037440b3c9ce1b6e41d
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 17:07:09 2021 +0800

    Memtable
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  3 +-
 .../iotdb/db/engine/flush/NotifyFlushMemTable.java |  4 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java | 48 +++++++++++++++++++---
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  7 ++++
 .../db/engine/memtable/IWritableMemChunk.java      |  6 ++-
 .../db/engine/memtable/PrimitiveMemTable.java      |  3 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java | 15 +++++--
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |  3 ++
 .../tsfile/write/schema/MeasurementSchema.java     | 34 ++++++++++++++-
 9 files changed, 108 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index c1bc5a9..4134d1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
@@ -110,7 +111,7 @@ public class MemTableFlushTask {
       for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = iWritableMemChunkEntry.getValue();
-        MeasurementSchema desc = series.getSchema();
+        IMeasurementSchema desc = series.getSchema();
         TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.put(new Pair<>(tvList, desc));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 5ba50d0..4aeb66c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.flush;
 import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 /**
  * Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public class NotifyFlushMemTable extends AbstractMemTable {
 
   @Override
-  protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 21c9bf2..dd18de4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
@@ -93,12 +94,12 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   private IWritableMemChunk createIfNotExistAndGet(
-      String deviceId, String measurement, MeasurementSchema schema) {
+      String deviceId, IMeasurementSchema schema) {
     Map<String, IWritableMemChunk> memSeries =
         memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
 
     return memSeries.computeIfAbsent(
-        measurement,
+        schema.getMeasurementId(),
         k -> {
           seriesNumber++;
           totalPointsNumThreshold += avgSeriesPointNumThreshold;
@@ -106,7 +107,7 @@ public abstract class AbstractMemTable implements IMemTable {
         });
   }
 
-  protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema);
+  protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
 
   @Override
   public void insert(InsertRowPlan insertRowPlan) {
@@ -138,6 +139,34 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   @Override
+  public void insert(InsertVectorPlan insertVectorPlan) {
+    updatePlanIndexes(insertVectorPlan.getIndex());
+    Object[] values = insertVectorPlan.getValues();
+
+    MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes();
+    String[] measurements = insertVectorPlan.getMeasurements();
+    IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema();
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      if (value == null) {
+        continue;
+      }
+
+      memSize +=
+          MemUtils.getRecordSize(
+              vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl);
+    }
+    write(
+        insertVectorPlan.getDeviceId().getFullPath(),
+        vmSchema,
+        insertVectorPlan.getTime(),
+        values);
+
+    totalPointsNum +=
+        insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber();
+  }
+
+  @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
       throws WriteProcessException {
     updatePlanIndexes(insertTabletPlan.getIndex());
@@ -160,7 +189,17 @@ public abstract class AbstractMemTable implements IMemTable {
       MeasurementSchema schema,
       long insertTime,
       Object objectValue) {
-    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema);
+    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
+    memSeries.write(insertTime, objectValue);
+  }
+
+  @Override
+  public void write(
+      String deviceId,
+      IMeasurementSchema schema,
+      long insertTime,
+      Object objectValue) {
+    IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
     memSeries.write(insertTime, objectValue);
   }
 
@@ -174,7 +213,6 @@ public abstract class AbstractMemTable implements IMemTable {
       IWritableMemChunk memSeries =
           createIfNotExistAndGet(
               insertTabletPlan.getDeviceId().getFullPath(),
-              insertTabletPlan.getMeasurements()[i],
               insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       memSeries.write(
           insertTabletPlan.getTimes(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce412a2..3a46e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
@@ -53,6 +54,12 @@ public interface IMemTable {
       long insertTime,
       Object objectValue);
 
+  void write(
+      String deviceId,
+      IMeasurementSchema schema,
+      long insertTime,
+      Object objectValue);
+
   void write(InsertTabletPlan insertTabletPlan, int start, int end);
 
   /** @return the number of points */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index bdb4bbc..10298e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
 public interface IWritableMemChunk {
 
@@ -36,6 +36,8 @@ public interface IWritableMemChunk {
   void putBinary(long t, Binary v);
 
   void putBoolean(long t, boolean v);
+  
+  void putVector(long t, byte[] v);
 
   void putLongs(long[] t, long[] v, int start, int end);
 
@@ -56,7 +58,7 @@ public interface IWritableMemChunk {
 
   long count();
 
-  MeasurementSchema getSchema();
+  IMeasurementSchema getSchema();
 
   /**
    * served for query requests.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 254d722..0336c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.HashMap;
@@ -38,7 +39,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
   }
 
   @Override
-  protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+  protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
     return new WritableMemChunk(schema, TVListAllocator.getInstance().allocate(schema.getType()));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4536896..8cf3422 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -22,14 +22,15 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 public class WritableMemChunk implements IWritableMemChunk {
 
-  private MeasurementSchema schema;
+  private IMeasurementSchema schema;
   private TVList list;
 
-  public WritableMemChunk(MeasurementSchema schema, TVList list) {
+  public WritableMemChunk(IMeasurementSchema schema, TVList list) {
     this.schema = schema;
     this.list = list;
   }
@@ -55,6 +56,9 @@ public class WritableMemChunk implements IWritableMemChunk {
       case TEXT:
         putBinary(insertTime, (Binary) objectValue);
         break;
+      case VECTOR:
+        putVector(insertTime, (byte[]) objectValue);
+        break;
       default:
         throw new UnSupportedDataTypeException("Unsupported data type:" + schema.getType());
     }
@@ -123,6 +127,11 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
+  public void putVector(long t, byte[] v) {
+    list.putVector(t, v);
+  }
+
+  @Override
   public void putLongs(long[] t, long[] v, int start, int end) {
     list.putLongs(t, v, start, end);
   }
@@ -188,7 +197,7 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public MeasurementSchema getSchema() {
+  public IMeasurementSchema getSchema() {
     return schema;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 8236e15..791af95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -124,6 +124,9 @@ public class ReadOnlyMemChunk {
           case DOUBLE:
             statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
             break;
+          case VECTOR:
+            statsByType.update(timeValuePair.getTimestamp());
+            break;
           default:
             throw new QueryProcessException("Unsupported data type:" + dataType);
         }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 34ee496..3b2368d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -42,7 +43,8 @@ import java.util.Objects;
  * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
  * TSDataTypeConverter up to now.
  */
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
+public class MeasurementSchema implements Comparable<MeasurementSchema>,
+    Serializable, IMeasurementSchema {
 
   public static final MeasurementSchema TIME_SCHEMA =
       new MeasurementSchema(
@@ -323,4 +325,34 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
   public void setType(TSDataType type) {
     this.type = type.serialize();
   }
+
+  @Override
+  public TSEncoding getTimeTSEncoding() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<String> getValueMeasurementIdList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getValueTSDataTypeList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<TSEncoding> getValueTSEncodingList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<Encoder> getValueEncoderList() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }