You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/17 03:22:29 UTC
[iotdb] 01/02: Introduce IWritableMemChunkGroup
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch IWritableMemChunkGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0cd21e12749228eea857e70829ef305cac412ee2
Author: HTHou <hh...@outlook.com>
AuthorDate: Wed Nov 17 11:20:01 2021 +0800
Introduce IWritableMemChunkGroup
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 5 +-
.../iotdb/db/engine/flush/NotifyFlushMemTable.java | 12 --
.../iotdb/db/engine/memtable/AbstractMemTable.java | 237 +++++++++------------
.../engine/memtable/AlignedWritableMemChunk.java | 104 +++++----
.../memtable/AlignedWritableMemChunkGroup.java | 62 ++++++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 9 +-
.../db/engine/memtable/IWritableMemChunk.java | 7 +-
.../db/engine/memtable/IWritableMemChunkGroup.java | 30 +++
.../db/engine/memtable/PrimitiveMemTable.java | 17 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 12 +-
.../db/engine/memtable/WritableMemChunkGroup.java | 99 +++++++++
.../db/engine/storagegroup/TsFileProcessor.java | 29 +--
.../apache/iotdb/db/metadata/path/AlignedPath.java | 7 +-
.../iotdb/db/metadata/path/MeasurementPath.java | 8 +-
.../apache/iotdb/db/metadata/path/PartialPath.java | 4 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 6 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 15 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 22 +-
.../db/engine/memtable/MemTableTestUtils.java | 6 +-
.../db/engine/memtable/MemtableBenchmark.java | 7 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 14 +-
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 28 +++
23 files changed, 481 insertions(+), 268 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 17deb09..1d0ae09 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1281,13 +1281,10 @@ public class IoTDBDescriptor {
+ queryMemoryAllocateProportion);
}
}
- }
- conf.setMaxQueryDeduplicatedPathNum(
- Integer.parseInt(
- properties.getProperty(
- "max_deduplicated_path_num",
- Integer.toString(conf.getMaxQueryDeduplicatedPathNum()))));
+ conf.setMaxQueryDeduplicatedPathNum(
+ Integer.parseInt(properties.getProperty("max_deduplicated_path_num")));
+ }
}
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 957d03e..ecbdac8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -101,11 +102,11 @@ public class MemTableFlushTask {
long sortTime = 0;
// for map do not use get(key) to iterate
- for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry :
+ for (Map.Entry<String, IWritableMemChunkGroup> memTableEntry :
memTable.getMemTableMap().entrySet()) {
encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey()));
- final Map<String, IWritableMemChunk> value = memTableEntry.getValue();
+ final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap();
for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = iWritableMemChunkEntry.getValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 169c22f..8d80b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.engine.flush;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
/**
* Only used in sync flush and async close to start a flush task This memtable is not managed by
@@ -30,16 +28,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
public class NotifyFlushMemTable extends AbstractMemTable {
@Override
- protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
- return null;
- }
-
- @Override
- protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
- return null;
- }
-
- @Override
public IMemTable copy() {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 72c9b72..c5e8ae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -22,20 +22,13 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,7 +40,7 @@ import java.util.Map.Entry;
public abstract class AbstractMemTable implements IMemTable {
- private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
+ private final Map<String, IWritableMemChunkGroup> memTableMap;
/**
* The initial value is true because we want calculate the text data size when recover memTable!!
*/
@@ -80,12 +73,12 @@ public abstract class AbstractMemTable implements IMemTable {
this.memTableMap = new HashMap<>();
}
- public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+ public AbstractMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
this.memTableMap = memTableMap;
}
@Override
- public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
+ public Map<String, IWritableMemChunkGroup> getMemTableMap() {
return memTableMap;
}
@@ -93,63 +86,64 @@ public abstract class AbstractMemTable implements IMemTable {
* create this MemChunk if it's not exist
*
* @param deviceId device id
- * @param schema measurement schema
- * @return this MemChunk
+ * @param schemaList measurement schemaList
+ * @return this MemChunkGroup
*/
- private IWritableMemChunk createMemChunkIfNotExistAndGet(
- String deviceId, IMeasurementSchema schema) {
- Map<String, IWritableMemChunk> memSeries =
- memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
- return memSeries.computeIfAbsent(
- schema.getMeasurementId(),
- k -> {
- seriesNumber++;
- totalPointsNumThreshold += avgSeriesPointNumThreshold;
- return genMemSeries(schema);
- });
- }
-
- private IWritableMemChunk createAlignedMemChunkIfNotExistAndGet(
- String deviceId, IMeasurementSchema schema) {
- Map<String, IWritableMemChunk> memSeries =
- memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
-
- VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
- return memSeries.computeIfAbsent(
- vectorSchema.getMeasurementId(),
- k -> {
- seriesNumber++;
- totalPointsNumThreshold +=
- avgSeriesPointNumThreshold * vectorSchema.getSubMeasurementsCount();
- return genAlignedMemSeries(vectorSchema);
- });
+ private IWritableMemChunkGroup createMemChunkGroupIfNotExistAndGet(
+ String deviceId, List<IMeasurementSchema> schemaList) {
+ IWritableMemChunkGroup memChunkGroup =
+ memTableMap.computeIfAbsent(
+ deviceId,
+ k -> {
+ seriesNumber++;
+ totalPointsNumThreshold += avgSeriesPointNumThreshold;
+ return new WritableMemChunkGroup(schemaList);
+ });
+ for (IMeasurementSchema schema : schemaList) {
+ if (!memChunkGroup.contains(schema.getMeasurementId())) {
+ seriesNumber++;
+ totalPointsNumThreshold += avgSeriesPointNumThreshold;
+ }
+ }
+ return memChunkGroup;
+ }
+
+ private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
+ String deviceId, List<IMeasurementSchema> schemaList) {
+ IWritableMemChunkGroup memChunkGroup =
+ memTableMap.computeIfAbsent(
+ deviceId,
+ k -> {
+ seriesNumber++;
+ totalPointsNumThreshold += avgSeriesPointNumThreshold;
+ return new AlignedWritableMemChunkGroup(schemaList);
+ });
+ for (IMeasurementSchema schema : schemaList) {
+ if (!memChunkGroup.contains(schema.getMeasurementId())) {
+ seriesNumber++;
+ totalPointsNumThreshold += avgSeriesPointNumThreshold;
+ }
+ }
+ return memChunkGroup;
}
- protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
-
- protected abstract IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema);
-
@Override
public void insert(InsertRowPlan insertRowPlan) {
updatePlanIndexes(insertRowPlan.getIndex());
Object[] values = insertRowPlan.getValues();
- IMeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes();
- for (int i = 0; i < measurementMNodes.length; i++) {
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
if (values[i] == null) {
continue;
}
- memSize +=
- MemUtils.getRecordSize(
- measurementMNodes[i].getSchema().getType(), values[i], disableMemControl);
-
- write(
- insertRowPlan.getDeviceId().getFullPath(),
- measurementMNodes[i].getSchema(),
- insertRowPlan.getTime(),
- values[i]);
+ IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
+ schemaList.add(schema);
+ dataTypes.add(schema.getType());
}
+ memSize += MemUtils.getRecordsSize(dataTypes, values, disableMemControl);
+ write(insertRowPlan.getDeviceId().getFullPath(), schemaList, insertRowPlan.getTime(), values);
totalPointsNum +=
insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
}
@@ -158,34 +152,24 @@ public abstract class AbstractMemTable implements IMemTable {
public void insertAlignedRow(InsertRowPlan insertRowPlan) {
updatePlanIndexes(insertRowPlan.getIndex());
// write vector
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- List<TSEncoding> encodings = new ArrayList<>();
- CompressionType compressionType = null;
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
- if (insertRowPlan.getMeasurements()[i] == null) {
+ if (insertRowPlan.getValues()[i] == null) {
continue;
}
IMeasurementSchema schema = insertRowPlan.getMeasurementMNodes()[i].getSchema();
- measurements.add(schema.getMeasurementId());
- types.add(schema.getType());
- encodings.add(schema.getEncodingType());
- compressionType = schema.getCompressor();
+ schemaList.add(schema);
+ dataTypes.add(schema.getType());
}
- if (measurements.isEmpty()) {
+ if (schemaList.isEmpty()) {
return;
}
- VectorMeasurementSchema vectorSchema =
- new VectorMeasurementSchema(
- AlignedPath.VECTOR_PLACEHOLDER,
- measurements.toArray(new String[measurements.size()]),
- types.toArray(new TSDataType[measurements.size()]),
- encodings.toArray(new TSEncoding[measurements.size()]),
- compressionType);
- memSize += MemUtils.getAlignedRecordSize(types, insertRowPlan.getValues(), disableMemControl);
+ memSize +=
+ MemUtils.getAlignedRecordsSize(dataTypes, insertRowPlan.getValues(), disableMemControl);
writeAlignedRow(
insertRowPlan.getDeviceId().getFullPath(),
- vectorSchema,
+ schemaList,
insertRowPlan.getTime(),
insertRowPlan.getValues());
totalPointsNum +=
@@ -224,92 +208,81 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public void write(
- String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue) {
- IWritableMemChunk memSeries = createMemChunkIfNotExistAndGet(deviceId, schema);
- memSeries.write(insertTime, objectValue);
+ String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+ IWritableMemChunkGroup memChunkGroup =
+ createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+ memChunkGroup.write(insertTime, objectValue, schemaList);
}
@Override
public void writeAlignedRow(
- String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue) {
- IWritableMemChunk memSeries = createAlignedMemChunkIfNotExistAndGet(deviceId, schema);
- memSeries.writeAlignedValue(insertTime, objectValue, schema);
+ String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue) {
+ IWritableMemChunkGroup memChunkGroup =
+ createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
+ memChunkGroup.write(insertTime, objectValue, schemaList);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
- updatePlanIndexes(insertTabletPlan.getIndex());
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
continue;
}
- IWritableMemChunk memSeries =
- createMemChunkIfNotExistAndGet(
- insertTabletPlan.getDeviceId().getFullPath(),
- insertTabletPlan.getMeasurementMNodes()[i].getSchema());
- memSeries.write(
- insertTabletPlan.getTimes(),
- insertTabletPlan.getColumns()[i],
- insertTabletPlan.getBitMaps() != null ? insertTabletPlan.getBitMaps()[i] : null,
- insertTabletPlan.getDataTypes()[i],
- start,
- end);
+ IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
+ schemaList.add(schema);
}
+ IWritableMemChunkGroup memChunkGroup =
+ createMemChunkGroupIfNotExistAndGet(
+ insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+ memChunkGroup.writeValues(
+ insertTabletPlan.getTimes(),
+ insertTabletPlan.getColumns(),
+ insertTabletPlan.getBitMaps(),
+ schemaList,
+ start,
+ end);
}
+ @Override
public void writeAlignedTablet(InsertTabletPlan insertTabletPlan, int start, int end) {
- updatePlanIndexes(insertTabletPlan.getIndex());
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- List<TSEncoding> encodings = new ArrayList<>();
- CompressionType compressionType = null;
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
continue;
}
IMeasurementSchema schema = insertTabletPlan.getMeasurementMNodes()[i].getSchema();
- measurements.add(schema.getMeasurementId());
- types.add(schema.getType());
- encodings.add(schema.getEncodingType());
- compressionType = schema.getCompressor();
+ schemaList.add(schema);
}
- if (measurements.isEmpty()) {
+ if (schemaList.isEmpty()) {
return;
}
- VectorMeasurementSchema vectorSchema =
- new VectorMeasurementSchema(
- AlignedPath.VECTOR_PLACEHOLDER,
- measurements.toArray(new String[measurements.size()]),
- types.toArray(new TSDataType[measurements.size()]),
- encodings.toArray(new TSEncoding[measurements.size()]),
- compressionType);
- IWritableMemChunk memSeries =
- createAlignedMemChunkIfNotExistAndGet(
- insertTabletPlan.getDeviceId().getFullPath(), vectorSchema);
- memSeries.writeAlignedValues(
+ IWritableMemChunkGroup memChunkGroup =
+ createAlignedMemChunkGroupIfNotExistAndGet(
+ insertTabletPlan.getDeviceId().getFullPath(), schemaList);
+ memChunkGroup.writeValues(
insertTabletPlan.getTimes(),
insertTabletPlan.getColumns(),
insertTabletPlan.getBitMaps(),
- vectorSchema,
+ schemaList,
start,
end);
}
@Override
public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) {
- Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
- if (null == memSeries) {
+ IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+ if (null == memChunkGroup) {
return true;
}
- return !memSeries.containsKey(measurement);
+ return !memChunkGroup.contains(measurement);
}
@Override
- public int getCurrentChunkPointNum(String deviceId, String measurement) {
- Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
- IWritableMemChunk memChunk = memSeries.get(measurement);
- return (int) memChunk.count();
+ public long getCurrentChunkPointNum(String deviceId, String measurement) {
+ IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
+ return memChunkGroup.getCurrentChunkPointNum(measurement);
}
@Override
@@ -325,10 +298,8 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public long size() {
long sum = 0;
- for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) {
- for (IWritableMemChunk writableMemChunk : seriesMap.values()) {
- sum += writableMemChunk.count();
- }
+ for (IWritableMemChunkGroup writableMemChunkGroup : memTableMap.values()) {
+ sum += writableMemChunkGroup.count();
}
return sum;
}
@@ -373,12 +344,13 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public void delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
- Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
- if (deviceMap == null) {
+ IWritableMemChunkGroup memChunkGroup = memTableMap.get(devicePath.getFullPath());
+ if (memChunkGroup == null) {
return;
}
- Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator();
+ Iterator<Entry<String, IWritableMemChunk>> iter =
+ memChunkGroup.getMemChunkMap().entrySet().iterator();
while (iter.hasNext()) {
Entry<String, IWritableMemChunk> entry = iter.next();
IWritableMemChunk chunk = entry.getValue();
@@ -433,13 +405,8 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public void release() {
- for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) {
- for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) {
- TVList list = subEntry.getValue().getTVList();
- if (list.getReferenceCount() == 0) {
- TVListAllocator.getInstance().release(list);
- }
- }
+ for (Entry<String, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+ entry.getValue().release();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index cd7184b..69ebabb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -23,64 +23,71 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class AlignedWritableMemChunk implements IWritableMemChunk {
- private final VectorMeasurementSchema schema;
+ private final Map<String, Integer> measurementIndexMap;
+ private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class);
- public AlignedWritableMemChunk(VectorMeasurementSchema schema) {
- this.schema = schema;
- this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
+ public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
+ this.measurementIndexMap = new LinkedHashMap<>();
+ List<TSDataType> dataTypeList = new ArrayList<>();
+ this.schemaList = schemaList;
+ for (int i = 0; i < schemaList.size(); i++) {
+ measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
+ dataTypeList.add(schemaList.get(i).getType());
+ }
+ this.list = TVListAllocator.getInstance().allocate(dataTypeList);
}
public boolean containsMeasurement(String measurementId) {
- return schema.containsSubMeasurement(measurementId);
+ return measurementIndexMap.containsKey(measurementId);
}
@Override
public void putLong(long t, long v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putInt(long t, int v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putFloat(long t, float v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putDouble(long t, double v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBinary(long t, Binary v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBoolean(long t, boolean v) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
@@ -90,32 +97,32 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
@@ -126,19 +133,20 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public void write(long insertTime, Object objectValue) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
- public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
- int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+ public void writeAlignedValue(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
putAlignedValue(insertTime, objectValue, columnIndexArray);
}
@Override
public void write(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
@@ -146,32 +154,29 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
long[] times,
Object[] valueList,
BitMap[] bitMaps,
- IMeasurementSchema schema,
+ List<IMeasurementSchema> schemaList,
int start,
int end) {
- int[] columnIndexArray = checkColumnsInInsertPlan(schema);
+ int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
}
- private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) {
- VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
- List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList();
- List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList();
- List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList();
- for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) {
- if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) {
- this.schema.addMeasurement(
- measurementIdsInInsertPlan.get(i),
- dataTypesInInsertPlan.get(i),
- encodingsInInsertPlan.get(i));
- this.list.extendColumn(dataTypesInInsertPlan.get(i));
+ private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> schemaListInInsertPlan) {
+ List<String> measurementIdsInInsertPlan = new ArrayList<>();
+ for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
+ measurementIdsInInsertPlan.add(schemaListInInsertPlan.get(i).getMeasurementId());
+ if (!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
+ this.measurementIndexMap.put(
+ schemaListInInsertPlan.get(i).getMeasurementId(), measurementIndexMap.size());
+ this.schemaList.add(schemaListInInsertPlan.get(i));
+ this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
}
}
- List<String> measurementIdsInTVList = this.schema.getSubMeasurementsList();
- int[] columnIndexArray = new int[measurementIdsInTVList.size()];
- for (int i = 0; i < columnIndexArray.length; i++) {
- columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i));
- }
+ int[] columnIndexArray = new int[measurementIndexMap.size()];
+ measurementIndexMap.forEach(
+ (measurementId, i) -> {
+ columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementId);
+ });
return columnIndexArray;
}
@@ -182,7 +187,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public long count() {
- return (long) list.size() * schema.getSubMeasurementsCount();
+ return (long) list.size() * measurementIndexMap.size();
}
public long alignedListSize() {
@@ -191,7 +196,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public IMeasurementSchema getSchema() {
- return schema;
+ return null;
}
@Override
@@ -209,7 +214,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
list.increaseReferenceCount();
List<Integer> columnIndexList = new ArrayList<>();
for (IMeasurementSchema measurementSchema : schemaList) {
- columnIndexList.add(schema.getSubMeasurementIndex(measurementSchema.getMeasurementId()));
+ columnIndexList.add(measurementIndexMap.get(measurementSchema.getMeasurementId()));
}
return list.getTvListByColumnIndex(columnIndexList);
}
@@ -243,7 +248,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public IChunkWriter createIChunkWriter() {
- return new AlignedChunkWriterImpl(schema);
+ return new AlignedChunkWriterImpl(schemaList);
}
@Override
@@ -310,4 +315,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
timeDuplicateAlignedRowIndexList = null;
}
}
+
+ @Override
+ public void release() {
+ if (list.getReferenceCount() == 0) {
+ TVListAllocator.getInstance().release(list);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
new file mode 100644
index 0000000..a15fb6f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
+
+ private AlignedWritableMemChunk memChunk;
+
+ public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+ memChunk = new AlignedWritableMemChunk(schemaList);
+ }
+
+ @Override
+ public void writeValues(
+ long[] times,
+ Object[] columns,
+ BitMap[] bitMaps,
+ List<IMeasurementSchema> schemaList,
+ int start,
+ int end) {
+ memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end);
+ }
+
+ @Override
+ public void release() {
+ memChunk.release();
+ }
+
+ @Override
+ public long count() {
+ return memChunk.count();
+ }
+
+ @Override
+ public boolean contains(String measurement) {
+ return memChunk.containsMeasurement(measurement);
+ }
+
+ @Override
+ public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ memChunk.writeAlignedValue(insertTime, objectValue, schemaList);
+ }
+
+ @Override
+ public Map<String, IWritableMemChunk> getMemChunkMap() {
+ return Collections.singletonMap("", memChunk);
+ }
+
+ @Override
+ public long getCurrentChunkPointNum(String measurement) {
+ return memChunk.count();
+ }
+
+ public AlignedWritableMemChunk getAlignedMemChunk() {
+ return memChunk;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 1dc2446..e05b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -42,12 +42,13 @@ import java.util.Map;
*/
public interface IMemTable {
- Map<String, Map<String, IWritableMemChunk>> getMemTableMap();
+ Map<String, IWritableMemChunkGroup> getMemTableMap();
- void write(String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue);
+ void write(
+ String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
void writeAlignedRow(
- String deviceId, IMeasurementSchema schema, long insertTime, Object[] objectValue);
+ String deviceId, List<IMeasurementSchema> schemaList, long insertTime, Object[] objectValue);
/**
* write data in the range [start, end). Null value in each column values will be replaced by the
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
@@ -145,7 +146,7 @@ public interface IMemTable {
boolean checkIfChunkDoesNotExist(String deviceId, String measurement);
/** only used when mem control enabled */
- int getCurrentChunkPointNum(String deviceId, String measurement);
+ long getCurrentChunkPointNum(String deviceId, String measurement);
/** only used when mem control enabled */
void addTextDataSize(long textDataIncrement);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index ee10ea1..ce8b6f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -60,7 +60,8 @@ public interface IWritableMemChunk {
void write(long insertTime, Object objectValue);
- void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema);
+ void writeAlignedValue(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
/**
* write data in the range [start, end). Null value in the valueList will be replaced by the
@@ -73,7 +74,7 @@ public interface IWritableMemChunk {
long[] times,
Object[] valueList,
BitMap[] bitMaps,
- IMeasurementSchema schema,
+ List<IMeasurementSchema> schemaList,
int start,
int end);
@@ -129,4 +130,6 @@ public interface IWritableMemChunk {
IChunkWriter createIChunkWriter();
void encode(IChunkWriter chunkWriter);
+
+ void release();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
new file mode 100644
index 0000000..ffbd65c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IWritableMemChunkGroup {
+
+ void writeValues(
+ long[] times,
+ Object[] columns,
+ BitMap[] bitMaps,
+ List<IMeasurementSchema> schemaList,
+ int start,
+ int end);
+
+ void release();
+
+ long count();
+
+ boolean contains(String measurement);
+
+ void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
+
+ Map<String, IWritableMemChunk> getMemChunkMap();
+
+ long getCurrentChunkPointNum(String measurement);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 6915967..425fc21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-
import java.util.HashMap;
import java.util.Map;
@@ -33,23 +30,13 @@ public class PrimitiveMemTable extends AbstractMemTable {
this.disableMemControl = !enableMemControl;
}
- public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+ public PrimitiveMemTable(Map<String, IWritableMemChunkGroup> memTableMap) {
super(memTableMap);
}
@Override
- protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) {
- return new WritableMemChunk(schema);
- }
-
- @Override
- protected IWritableMemChunk genAlignedMemSeries(IMeasurementSchema schema) {
- return new AlignedWritableMemChunk((VectorMeasurementSchema) schema);
- }
-
- @Override
public IMemTable copy() {
- Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
+ Map<String, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap());
return new PrimitiveMemTable(newMap);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3b096ad..51c7749 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -72,7 +72,8 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
+ public void writeAlignedValue(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
}
@@ -114,7 +115,7 @@ public class WritableMemChunk implements IWritableMemChunk {
long[] times,
Object[] valueList,
BitMap[] bitMaps,
- IMeasurementSchema schema,
+ List<IMeasurementSchema> schemaList,
int start,
int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
@@ -346,4 +347,11 @@ public class WritableMemChunk implements IWritableMemChunk {
}
}
}
+
+ @Override
+ public void release() {
+ // if (list.getReferenceCount() == 0) {
+ // TVListAllocator.getInstance().release(list);
+ // }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
new file mode 100644
index 0000000..5845da3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -0,0 +1,99 @@
+package org.apache.iotdb.db.engine.memtable;
+
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WritableMemChunkGroup implements IWritableMemChunkGroup {
+
+ private Map<String, IWritableMemChunk> memChunkMap;
+
+ public WritableMemChunkGroup(List<IMeasurementSchema> schemaList) {
+ memChunkMap = new HashMap<>();
+ for (IMeasurementSchema schema : schemaList) {
+ createMemChunkIfNotExistAndGet(schema);
+ }
+ }
+
+ @Override
+ public void writeValues(
+ long[] times,
+ Object[] columns,
+ BitMap[] bitMaps,
+ List<IMeasurementSchema> schemaList,
+ int start,
+ int end) {
+ int emptyColumnCount = 0;
+ for (int i = 0; i < columns.length; i++) {
+ if (columns[i] == null) {
+ emptyColumnCount++;
+ continue;
+ }
+ IWritableMemChunk memChunk =
+ createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+ memChunk.write(
+ times,
+ columns[i],
+ bitMaps == null ? null : bitMaps[i],
+ schemaList.get(i - emptyColumnCount).getType(),
+ start,
+ end);
+ }
+ }
+
+ private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) {
+ return memChunkMap.computeIfAbsent(
+ schema.getMeasurementId(),
+ k -> {
+ return new WritableMemChunk(schema);
+ });
+ }
+
+ @Override
+ public void release() {
+ for (IWritableMemChunk memChunk : memChunkMap.values()) {
+ memChunk.release();
+ }
+ }
+
+ @Override
+ public long count() {
+ long count = 0;
+ for (IWritableMemChunk memChunk : memChunkMap.values()) {
+ count += memChunk.count();
+ }
+ return count;
+ }
+
+ @Override
+ public boolean contains(String measurement) {
+ return memChunkMap.containsKey(measurement);
+ }
+
+ @Override
+ public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ int emptyColumnCount = 0;
+ for (int i = 0; i < objectValue.length; i++) {
+ if (objectValue[i] == null) {
+ emptyColumnCount++;
+ continue;
+ }
+ IWritableMemChunk memChunk =
+ createMemChunkIfNotExistAndGet(schemaList.get(i - emptyColumnCount));
+ memChunk.write(insertTime, objectValue[i]);
+ }
+ }
+
+ @Override
+ public Map<String, IWritableMemChunk> getMemChunkMap() {
+ return memChunkMap;
+ }
+
+ @Override
+ public long getCurrentChunkPointNum(String measurement) {
+ return memChunkMap.get(measurement).count();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 93d020f..5908597 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
@@ -355,7 +356,7 @@ public class TsFileProcessor {
memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
} else {
// here currentChunkPointNum >= 1
- int currentChunkPointNum =
+ long currentChunkPointNum =
workMemTable.getCurrentChunkPointNum(deviceId, insertRowPlan.getMeasurements()[i]);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
@@ -378,7 +379,7 @@ public class TsFileProcessor {
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
- AlignedWritableMemChunk vectorMemChunk = null;
+ AlignedWritableMemChunk alignedMemChunk = null;
String deviceId = insertRowPlan.getDeviceId().getFullPath();
if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
// ChunkMetadataIncrement
@@ -388,15 +389,15 @@ public class TsFileProcessor {
memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes());
} else {
// here currentChunkPointNum >= 1
- int currentChunkPointNum =
+ long currentChunkPointNum =
workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes())
: 0;
- vectorMemChunk =
- ((AlignedWritableMemChunk)
- workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+ alignedMemChunk =
+ ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+ .getAlignedMemChunk();
}
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
// skip failed Measurements
@@ -404,10 +405,10 @@ public class TsFileProcessor {
continue;
}
// extending the column of aligned mem chunk
- if (vectorMemChunk != null
- && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+ if (alignedMemChunk != null
+ && !alignedMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
memTableIncrement +=
- (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+ (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
* insertRowPlan.getDataTypes()[i].getDataTypeSize();
}
// TEXT data mem size
@@ -486,13 +487,13 @@ public class TsFileProcessor {
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemSize(dataType);
} else {
- int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
+ long currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, measurement);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemSize(dataType);
} else {
- int acquireArray =
+ long acquireArray =
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
/ PrimitiveArrayManager.ARRAY_SIZE;
memIncrements[0] +=
@@ -526,7 +527,7 @@ public class TsFileProcessor {
* AlignedTVList.alignedTvListArrayMemSize(dataTypes);
} else {
int currentChunkPointNum =
- workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
+ (int) workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -541,8 +542,8 @@ public class TsFileProcessor {
: acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
}
vectorMemChunk =
- ((AlignedWritableMemChunk)
- workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
+ ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
+ .getAlignedMemChunk();
}
for (int i = 0; i < dataTypes.length; i++) {
TSDataType dataType = dataTypes[i];
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index ff51b18..f85338d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.metadata.path;
import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
@@ -334,14 +335,14 @@ public class AlignedPath extends PartialPath {
@Override
public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
throws QueryProcessException, IOException {
// check If memtable contains this path
if (!memTableMap.containsKey(getDevice())) {
return null;
}
AlignedWritableMemChunk alignedMemChunk =
- ((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER));
+ ((AlignedWritableMemChunkGroup) memTableMap.get(getDevice())).getAlignedMemChunk();
boolean containsMeasurement = false;
for (String measurement : measurementList) {
if (alignedMemChunk.containsMeasurement(measurement)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index d8aef64..d6900de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.path;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -246,14 +247,15 @@ public class MeasurementPath extends PartialPath {
@Override
public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
throws QueryProcessException, IOException {
// check If Memtable Contains this path
if (!memTableMap.containsKey(getDevice())
- || !memTableMap.get(getDevice()).containsKey(getMeasurement())) {
+ || !memTableMap.get(getDevice()).contains(getMeasurement())) {
return null;
}
- IWritableMemChunk memChunk = memTableMap.get(getDevice()).get(getMeasurement());
+ IWritableMemChunk memChunk =
+ memTableMap.get(getDevice()).getMemChunkMap().get(getMeasurement());
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList chunkCopy = memChunk.getSortedTvListForQuery();
int curSize = chunkCopy.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index 90581d6..ae42486 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.metadata.path;
-import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -423,7 +423,7 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
* @return ReadOnlyMemChunk
*/
public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
- Map<String, Map<String, IWritableMemChunk>> memTableMap, List<TimeRange> deletionList)
+ Map<String, IWritableMemChunkGroup> memTableMap, List<TimeRange> deletionList)
throws QueryProcessException, IOException {
throw new UnsupportedOperationException("Should call exact sub class!");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 5fb024b..c29ebc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -241,11 +241,7 @@ public class InsertRowPlan extends InsertPlan {
}
continue;
}
- if (isAligned) {
- dataTypes[i] = measurementMNodes[i].getSchema().getSubMeasurementsTSDataTypeList().get(i);
- } else {
- dataTypes[i] = measurementMNodes[i].getSchema().getType();
- }
+ dataTypes[i] = measurementMNodes[i].getSchema().getType();
try {
values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
} catch (Exception e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index e81a2ab..10617cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -56,10 +56,23 @@ public class MemUtils {
}
/**
+ * function for getting the value size. If mem control enabled, do not add text data size here,
+ * the size will be added to memtable before inserting.
+ */
+ public static long getRecordsSize(
+ List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
+ long memSize = 0L;
+ for (int i = 0; i < dataTypes.size(); i++) {
+ memSize += getRecordSize(dataTypes.get(i), value[i], addingTextDataSize);
+ }
+ return memSize;
+ }
+
+ /**
* function for getting the vector value size. If mem control enabled, do not add text data size
* here, the size will be added to memtable before inserting.
*/
- public static long getAlignedRecordSize(
+ public static long getAlignedRecordsSize(
List<TSDataType> dataTypes, Object[] value, boolean addingTextDataSize) {
// time and index size
long memSize = 8L + 4L;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 7ded716..836fdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.writelog.recover;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -126,11 +127,11 @@ public class LogReplayer {
}
}
- Map<String, Map<String, IWritableMemChunk>> memTableMap = recoverMemTable.getMemTableMap();
- for (Map.Entry<String, Map<String, IWritableMemChunk>> deviceEntry : memTableMap.entrySet()) {
+ Map<String, IWritableMemChunkGroup> memTableMap = recoverMemTable.getMemTableMap();
+ for (Map.Entry<String, IWritableMemChunkGroup> deviceEntry : memTableMap.entrySet()) {
String deviceId = deviceEntry.getKey();
for (Map.Entry<String, IWritableMemChunk> measurementEntry :
- deviceEntry.getValue().entrySet()) {
+ deviceEntry.getValue().getMemChunkMap().entrySet()) {
WritableMemChunk memChunk = (WritableMemChunk) measurementEntry.getValue();
currentTsFileResource.updateStartTime(deviceId, memChunk.getFirstPoint());
currentTsFileResource.updateEndTime(deviceId, memChunk.getLastPoint());
@@ -194,10 +195,19 @@ public class LogReplayer {
// mark failed plan manually
checkDataTypeAndMarkFailed(mNodes, plan);
if (plan instanceof InsertRowPlan) {
- recoverMemTable.insert((InsertRowPlan) plan);
+ if (plan.isAligned()) {
+ recoverMemTable.insertAlignedRow((InsertRowPlan) plan);
+ } else {
+ recoverMemTable.insert((InsertRowPlan) plan);
+ }
} else {
- recoverMemTable.insertTablet(
- (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+ if (plan.isAligned()) {
+ recoverMemTable.insertAlignedTablet(
+ (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+ } else {
+ recoverMemTable.insertTablet(
+ (InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount());
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
index 09b21c6..00a6fd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableTestUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class MemTableTestUtils {
@@ -62,9 +63,10 @@ public class MemTableTestUtils {
for (long l = startTime; l <= endTime; l++) {
iMemTable.write(
deviceId,
- new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN),
+ Collections.singletonList(
+ new UnaryMeasurementSchema(measurementId, dataType, TSEncoding.PLAIN)),
l,
- (int) l);
+ new Object[] {(int) l});
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
index c11cd90..086a647 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import java.util.Collections;
+
/** Memtable insert benchmark. Bench the Memtable and get its performance. */
public class MemtableBenchmark {
@@ -46,9 +48,10 @@ public class MemtableBenchmark {
for (int j = 0; j < numOfMeasurement; j++) {
memTable.write(
deviceId,
- new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN),
+ Collections.singletonList(
+ new UnaryMeasurementSchema(measurementId[j], tsDataType, TSEncoding.PLAIN)),
System.nanoTime(),
- String.valueOf(System.currentTimeMillis()));
+ new Object[] {String.valueOf(System.currentTimeMillis())});
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index a1c925f..6be17a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -119,16 +119,18 @@ public class PrimitiveMemTableTest {
for (int i = 0; i < dataSize; i++) {
memTable.write(
deviceId,
- new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+ Collections.singletonList(
+ new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
dataSize - i - 1,
- i + 10);
+ new Object[] {i + 10});
}
for (int i = 0; i < dataSize; i++) {
memTable.write(
deviceId,
- new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
+ Collections.singletonList(
+ new UnaryMeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN)),
i,
- i);
+ new Object[] {i});
}
MeasurementPath fullPath =
new MeasurementPath(
@@ -163,9 +165,9 @@ public class PrimitiveMemTableTest {
for (TimeValuePair aRet : ret) {
memTable.write(
deviceId,
- new UnaryMeasurementSchema(sensorId, dataType, encoding),
+ Collections.singletonList(new UnaryMeasurementSchema(sensorId, dataType, encoding)),
aRet.getTimestamp(),
- aRet.getValue().getValue());
+ new Object[] {aRet.getValue().getValue()});
}
MeasurementPath fullPath =
new MeasurementPath(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 4c977ca..81999ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.tsfile.write.chunk;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -63,6 +66,31 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
this.valueIndex = 0;
}
+ public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
+ TSEncoding timeEncoding =
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+ TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+ timeChunkWriter =
+ new TimeChunkWriter(
+ "",
+ schemaList.get(0).getCompressor(),
+ timeEncoding,
+ TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType));
+
+ valueChunkWriterList = new ArrayList<>(schemaList.size());
+ for (int i = 0; i < schemaList.size(); i++) {
+ valueChunkWriterList.add(
+ new ValueChunkWriter(
+ schemaList.get(i).getMeasurementId(),
+ schemaList.get(i).getCompressor(),
+ schemaList.get(i).getType(),
+ schemaList.get(i).getEncodingType(),
+ schemaList.get(i).getValueEncoder()));
+ }
+
+ this.valueIndex = 0;
+ }
+
public void write(long time, int value, boolean isNull) {
valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
}