You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:23 UTC
[iotdb] 03/04: Memtable
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27883e0c9485f503fc721037440b3c9ce1b6e41d
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 17:07:09 2021 +0800
Memtable
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 3 +-
.../iotdb/db/engine/flush/NotifyFlushMemTable.java | 4 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 48 +++++++++++++++++++---
.../apache/iotdb/db/engine/memtable/IMemTable.java | 7 ++++
.../db/engine/memtable/IWritableMemChunk.java | 6 ++-
.../db/engine/memtable/PrimitiveMemTable.java | 3 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 15 +++++--
.../db/engine/querycontext/ReadOnlyMemChunk.java | 3 ++
.../tsfile/write/schema/MeasurementSchema.java | 34 ++++++++++++++-
9 files changed, 108 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index c1bc5a9..4134d1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -110,7 +111,7 @@ public class MemTableFlushTask {
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
- MeasurementSchema desc = series.getSchema();
+ IMeasurementSchema desc = series.getSchema();
TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.put(new Pair<>(tvList, desc));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 5ba50d0..4aeb66c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.flush;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
/**
* Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class NotifyFlushMemTable extends AbstractMemTable {
@Override
- protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+ protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 21c9bf2..dd18de4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
@@ -93,12 +94,12 @@ public abstract class AbstractMemTable implements IMemTable {
}
private IWritableMemChunk createIfNotExistAndGet(
- String deviceId, String measurement, MeasurementSchema schema) {
+ String deviceId, IMeasurementSchema schema) {
Map<String, IWritableMemChunk> memSeries =
memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
return memSeries.computeIfAbsent(
- measurement,
+ schema.getMeasurementId(),
k -> {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
@@ -106,7 +107,7 @@ public abstract class AbstractMemTable implements IMemTable {
});
}
- protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema);
+ protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
@Override
public void insert(InsertRowPlan insertRowPlan) {
@@ -138,6 +139,34 @@ public abstract class AbstractMemTable implements IMemTable {
}
@Override
+ public void insert(InsertVectorPlan insertVectorPlan) {
+ updatePlanIndexes(insertVectorPlan.getIndex());
+ Object[] values = insertVectorPlan.getValues();
+
+ MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes();
+ String[] measurements = insertVectorPlan.getMeasurements();
+ IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema();
+ for (int i = 0; i < values.length; i++) {
+ Object value = values[i];
+ if (value == null) {
+ continue;
+ }
+
+ memSize +=
+ MemUtils.getRecordSize(
+ vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl);
+ }
+ write(
+ insertVectorPlan.getDeviceId().getFullPath(),
+ vmSchema,
+ insertVectorPlan.getTime(),
+ values);
+
+ totalPointsNum +=
+ insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber();
+ }
+
+ @Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
updatePlanIndexes(insertTabletPlan.getIndex());
@@ -160,7 +189,17 @@ public abstract class AbstractMemTable implements IMemTable {
MeasurementSchema schema,
long insertTime,
Object objectValue) {
- IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema);
+ IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
+ memSeries.write(insertTime, objectValue);
+ }
+
+ @Override
+ public void write(
+ String deviceId,
+ IMeasurementSchema schema,
+ long insertTime,
+ Object objectValue) {
+ IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
memSeries.write(insertTime, objectValue);
}
@@ -174,7 +213,6 @@ public abstract class AbstractMemTable implements IMemTable {
IWritableMemChunk memSeries =
createIfNotExistAndGet(
insertTabletPlan.getDeviceId().getFullPath(),
- insertTabletPlan.getMeasurements()[i],
insertTabletPlan.getMeasurementMNodes()[i].getSchema());
memSeries.write(
insertTabletPlan.getTimes(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce412a2..3a46e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
@@ -53,6 +54,12 @@ public interface IMemTable {
long insertTime,
Object objectValue);
+ void write(
+ String deviceId,
+ IMeasurementSchema schema,
+ long insertTime,
+ Object objectValue);
+
void write(InsertTabletPlan insertTabletPlan, int start, int end);
/** @return the number of points */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index bdb4bbc..10298e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
public interface IWritableMemChunk {
@@ -36,6 +36,8 @@ public interface IWritableMemChunk {
void putBinary(long t, Binary v);
void putBoolean(long t, boolean v);
+
+ void putVector(long t, byte[] v);
void putLongs(long[] t, long[] v, int start, int end);
@@ -56,7 +58,7 @@ public interface IWritableMemChunk {
long count();
- MeasurementSchema getSchema();
+ IMeasurementSchema getSchema();
/**
* served for query requests.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 254d722..0336c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.db.rescon.TVListAllocator;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.HashMap;
@@ -38,7 +39,7 @@ public class PrimitiveMemTable extends AbstractMemTable {
}
@Override
- protected IWritableMemChunk genMemSeries(MeasurementSchema schema) {
+ protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
return new WritableMemChunk(schema, TVListAllocator.getInstance().allocate(schema.getType()));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4536896..8cf3422 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -22,14 +22,15 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public class WritableMemChunk implements IWritableMemChunk {
- private MeasurementSchema schema;
+ private IMeasurementSchema schema;
private TVList list;
- public WritableMemChunk(MeasurementSchema schema, TVList list) {
+ public WritableMemChunk(IMeasurementSchema schema, TVList list) {
this.schema = schema;
this.list = list;
}
@@ -55,6 +56,9 @@ public class WritableMemChunk implements IWritableMemChunk {
case TEXT:
putBinary(insertTime, (Binary) objectValue);
break;
+ case VECTOR:
+ putVector(insertTime, (byte[]) objectValue);
+ break;
default:
throw new UnSupportedDataTypeException("Unsupported data type:" + schema.getType());
}
@@ -123,6 +127,11 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
+ public void putVector(long t, byte[] v) {
+ list.putVector(t, v);
+ }
+
+ @Override
public void putLongs(long[] t, long[] v, int start, int end) {
list.putLongs(t, v, start, end);
}
@@ -188,7 +197,7 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public MeasurementSchema getSchema() {
+ public IMeasurementSchema getSchema() {
return schema;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 8236e15..791af95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -124,6 +124,9 @@ public class ReadOnlyMemChunk {
case DOUBLE:
statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
break;
+ case VECTOR:
+ statsByType.update(timeValuePair.getTimestamp());
+ break;
default:
throw new QueryProcessException("Unsupported data type:" + dataType);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
index 34ee496..3b2368d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java
@@ -33,6 +33,7 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,7 +43,8 @@ import java.util.Objects;
* MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has
* TSDataTypeConverter up to now.
*/
-public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable {
+public class MeasurementSchema implements Comparable<MeasurementSchema>,
+ Serializable, IMeasurementSchema {
public static final MeasurementSchema TIME_SCHEMA =
new MeasurementSchema(
@@ -323,4 +325,34 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali
public void setType(TSDataType type) {
this.type = type.serialize();
}
+
+ @Override
+ public TSEncoding getTimeTSEncoding() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<String> getValueMeasurementIdList() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getValueTSDataTypeList() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<TSEncoding> getValueTSEncodingList() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<Encoder> getValueEncoderList() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}