You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/05 08:16:01 UTC
[iotdb] 01/02: Support extend column
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c79368db9993780c126ff606cc03a659774a6aa
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 5 14:21:01 2021 +0800
Support extend column
---
.../engine/memtable/AlignedWritableMemChunk.java | 47 ++++++++++---------
.../db/engine/querycontext/ReadOnlyMemChunk.java | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 53 +++++++++++++++++-----
.../apache/iotdb/db/metadata/path/AlignedPath.java | 6 +--
.../db/utils/datastructure/AlignedTVList.java | 48 +++++++++++---------
.../iotdb/db/utils/datastructure/TVList.java | 2 +-
.../write/schema/VectorMeasurementSchema.java | 16 +++++++
7 files changed, 116 insertions(+), 58 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index 1315a4c..90875a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -5,6 +5,7 @@ import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
@@ -16,29 +17,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class AlignedWritableMemChunk implements IWritableMemChunk {
- private IMeasurementSchema schema;
+ private VectorMeasurementSchema schema;
private AlignedTVList list;
- private Map<String, Integer> alignedMeasurementIndexMap;
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
private static final Logger LOGGER = LoggerFactory.getLogger(AlignedWritableMemChunk.class);
public AlignedWritableMemChunk(VectorMeasurementSchema schema) {
this.schema = schema;
- alignedMeasurementIndexMap = new HashMap<>();
- for (int i = 0; i < schema.getSubMeasurementsCount(); i++) {
- alignedMeasurementIndexMap.put(schema.getSubMeasurementsList().get(i), i);
- }
this.list = TVListAllocator.getInstance().allocate(schema.getSubMeasurementsTSDataTypeList());
}
public boolean containsMeasurement(String measurementId) {
- return alignedMeasurementIndexMap.containsKey(measurementId);
+ return schema.containsSubMeasurement(measurementId);
}
@Override
@@ -72,8 +66,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
}
@Override
- public void putAlignedValue(long t, Object[] v, int[] columnOrder) {
- list.putAlignedValue(t, v, columnOrder);
+ public void putAlignedValue(long t, Object[] v, int[] columnIndexArray) {
+ list.putAlignedValue(t, v, columnIndexArray);
}
@Override
@@ -108,8 +102,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public void putAlignedValues(
- long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndex, int start, int end) {
- list.putAlignedValues(t, v, bitMaps, columnIndex, start, end);
+ long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
+ list.putAlignedValues(t, v, bitMaps, columnIndexArray, start, end);
}
@Override
@@ -119,7 +113,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public void writeAlignedValue(long insertTime, Object[] objectValue, IMeasurementSchema schema) {
- int[] columnIndexArray = checkColumnOrder(schema);
+ int[] columnIndexArray = checkColumnsInInsertPlan(schema);
putAlignedValue(insertTime, objectValue, columnIndexArray);
}
@@ -137,19 +131,28 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
IMeasurementSchema schema,
int start,
int end) {
- int[] columnIndexArray = checkColumnOrder(schema);
+ int[] columnIndexArray = checkColumnsInInsertPlan(schema);
putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
}
- private int[] checkColumnOrder(IMeasurementSchema schema) {
+ private int[] checkColumnsInInsertPlan(IMeasurementSchema schema) {
VectorMeasurementSchema vectorSchema = (VectorMeasurementSchema) schema;
List<String> measurementIdsInInsertPlan = vectorSchema.getSubMeasurementsList();
+ List<TSDataType> dataTypesInInsertPlan = vectorSchema.getSubMeasurementsTSDataTypeList();
+ List<TSEncoding> encodingsInInsertPlan = vectorSchema.getSubMeasurementsTSEncodingList();
List<String> measurementIdsInTVList =
((VectorMeasurementSchema) this.schema).getSubMeasurementsList();
int[] columnIndexArray = new int[measurementIdsInTVList.size()];
for (int i = 0; i < columnIndexArray.length; i++) {
columnIndexArray[i] = measurementIdsInInsertPlan.indexOf(measurementIdsInTVList.get(i));
}
+ for (int i = 0; i < measurementIdsInInsertPlan.size(); i++) {
+ if (!containsMeasurement(measurementIdsInInsertPlan.get(i))) {
+ this.schema.addSubMeasurement(measurementIdsInInsertPlan.get(i), dataTypesInInsertPlan.get(i), encodingsInInsertPlan.get(i));
+ this.list.extendColumn(dataTypesInInsertPlan.get(i));
+
+ }
+ }
return columnIndexArray;
}
@@ -160,7 +163,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
@Override
public long count() {
- return list.size() * alignedMeasurementIndexMap.size();
+ return list.size() * schema.getSubMeasurementsCount();
+ }
+
+ public long alignedListSize() {
+ return list.size();
}
@Override
@@ -182,13 +189,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
// increase reference count
list.increaseReferenceCount();
List<Integer> columnIndexList = new ArrayList<>();
- List<TSDataType> dataTypeList = new ArrayList<>();
for (IMeasurementSchema measurementSchema : schemaList) {
columnIndexList.add(
- alignedMeasurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1));
- dataTypeList.add(measurementSchema.getType());
+ schema.getSubMeasurementIndex(measurementSchema.getMeasurementId()));
}
- return list.getTvListByColumnIndex(columnIndexList, dataTypeList);
+ return list.getTvListByColumnIndex(columnIndexList);
}
private void sortTVList() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index f0fdbeb..26492a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -210,7 +210,7 @@ public class ReadOnlyMemChunk {
IMeasurementSchema schema, Statistics[] valueStatistics, TimeValuePair timeValuePair)
throws QueryProcessException {
for (int i = 0; i < schema.getSubMeasurementsTSDataTypeList().size(); i++) {
- if (timeValuePair.getValue().getVector()[i] == null) {
+ if (timeValuePair.getValue().getVector() == null || timeValuePair.getValue().getVector()[i] == null) {
continue;
}
switch (schema.getSubMeasurementsTSDataTypeList().get(i)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b50c904..80fb3c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable;
+import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
@@ -39,6 +40,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -378,26 +380,37 @@ public class TsFileProcessor {
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
+ AlignedWritableMemChunk vectorMemChunk = null;
String deviceId = insertRowPlan.getPrefixPath().getFullPath();
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, null)) {
+ if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
// ChunkMetadataIncrement
chunkMetadataIncrement +=
- ChunkMetadata.calculateRamSize(
- insertRowPlan.getMeasurements()[0], insertRowPlan.getDataTypes()[0]);
+ ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
+ * insertRowPlan.getDataTypes().length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes());
} else {
// here currentChunkPointNum >= 1
- int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
+ int currentChunkPointNum =
+ workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
? AlignedTVList.alignedTvListArrayMemSize(insertRowPlan.getDataTypes())
: 0;
+ vectorMemChunk =
+ ((AlignedWritableMemChunk)
+ workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
}
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
// skip failed Measurements
if (insertRowPlan.getDataTypes()[i] == null || insertRowPlan.getMeasurements()[i] == null) {
continue;
}
+ // extending the column of aligned mem chunk
+ if (vectorMemChunk != null && !vectorMemChunk.containsMeasurement(insertRowPlan.getMeasurements()[i])) {
+ memTableIncrement +=
+ (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+ * insertRowPlan.getDataTypes()[i].getDataTypeSize();
+ }
// TEXT data mem size
if (insertRowPlan.getDataTypes()[i] == TSDataType.TEXT) {
textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]);
@@ -445,7 +458,7 @@ public class TsFileProcessor {
updateAlignedMemCost(
insertTabletPlan.getDataTypes(),
deviceId,
- insertTabletPlan.getMeasurements()[0],
+ insertTabletPlan.getMeasurements(),
start,
end,
memIncrements,
@@ -497,21 +510,24 @@ public class TsFileProcessor {
private void updateAlignedMemCost(
TSDataType[] dataTypes,
String deviceId,
- String measurementId,
+ String[] measurementIds,
int start,
int end,
long[] memIncrements,
Object[] columns) {
+ AlignedWritableMemChunk vectorMemChunk = null;
// memIncrements = [memTable, text, chunk metadata] respectively
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, null)) {
+ if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
// ChunkMetadataIncrement
memIncrements[2] +=
- dataTypes.length * ChunkMetadata.calculateRamSize(measurementId, dataTypes[0]);
+ dataTypes.length
+ * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR);
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* AlignedTVList.alignedTvListArrayMemSize(dataTypes);
} else {
- int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
+ int currentChunkPointNum =
+ workMemTable.getCurrentChunkPointNum(deviceId, AlignedPath.VECTOR_PLACEHOLDER);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -525,10 +541,25 @@ public class TsFileProcessor {
? 0
: acquireArray * AlignedTVList.alignedTvListArrayMemSize(dataTypes);
}
+ vectorMemChunk =
+ ((AlignedWritableMemChunk)
+ workMemTable.getMemTableMap().get(deviceId).get(AlignedPath.VECTOR_PLACEHOLDER));
}
- // TEXT data size
for (int i = 0; i < dataTypes.length; i++) {
- if (dataTypes[i] == TSDataType.TEXT) {
+ TSDataType dataType = dataTypes[i];
+ String measurement = measurementIds[i];
+ Object column = columns[i];
+ if (dataType == null || column == null || measurement == null) {
+ continue;
+ }
+ // extending the column of aligned mem chunk
+ if (vectorMemChunk != null && !vectorMemChunk.containsMeasurement(measurementIds[i])) {
+ memIncrements[0] +=
+ (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
+ * dataType.getDataTypeSize();
+ }
+ // TEXT data size
+ if (dataType == TSDataType.TEXT) {
Binary[] binColumn = (Binary[]) columns[i];
memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 43397ba..5a0a68e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -334,11 +334,11 @@ public class AlignedPath extends PartialPath {
if (!memTableMap.containsKey(getDevice())) {
return null;
}
- AlignedWritableMemChunk vectorMemChunk =
+ AlignedWritableMemChunk alignedMemChunk =
((AlignedWritableMemChunk) memTableMap.get(getDevice()).get(VECTOR_PLACEHOLDER));
boolean containsMeasurement = false;
for (String measurement : measurementList) {
- if (vectorMemChunk.containsMeasurement(measurement)) {
+ if (alignedMemChunk.containsMeasurement(measurement)) {
containsMeasurement = true;
break;
}
@@ -347,7 +347,7 @@ public class AlignedPath extends PartialPath {
return null;
}
// get sorted tv list is synchronized so different query can get right sorted list reference
- TVList vectorTvListCopy = vectorMemChunk.getSortedTvListForQuery(schemaList);
+ TVList vectorTvListCopy = alignedMemChunk.getSortedTvListForQuery(schemaList);
int curSize = vectorTvListCopy.size();
return new ReadOnlyMemChunk(getMeasurementSchema(), vectorTvListCopy, curSize, deletionList);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index a3f959f..f2a5d76 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -168,15 +168,16 @@ public class AlignedTVList extends TVList {
TsPrimitiveType[] vector = new TsPrimitiveType[values.size()];
for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) {
List<Object> columnValues = values.get(columnIndex);
- if (validIndexesForTimeDuplicatedRows != null) {
- arrayIndex = validIndexesForTimeDuplicatedRows[columnIndex] / ARRAY_SIZE;
- elementIndex = validIndexesForTimeDuplicatedRows[columnIndex] % ARRAY_SIZE;
- }
- if (bitMaps != null
+ if (columnValues == null
+ || bitMaps != null
&& bitMaps.get(columnIndex) != null
&& isValueMarked(valueIndex, columnIndex)) {
continue;
}
+ if (validIndexesForTimeDuplicatedRows != null) {
+ arrayIndex = validIndexesForTimeDuplicatedRows[columnIndex] / ARRAY_SIZE;
+ elementIndex = validIndexesForTimeDuplicatedRows[columnIndex] % ARRAY_SIZE;
+ }
switch (dataTypes.get(columnIndex)) {
case TEXT:
Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
@@ -222,22 +223,15 @@ public class AlignedTVList extends TVList {
}
@Override
- public TVList getTvListByColumnIndex(List<Integer> columnIndex, List<TSDataType> dataTypes) {
+ public TVList getTvListByColumnIndex(List<Integer> columnIndex) {
List<TSDataType> types = new ArrayList<>();
List<List<Object>> values = new ArrayList<>();
List<List<BitMap>> bitMaps = null;
for (int i = 0; i < columnIndex.size(); i++) {
- // columnIndex == -1 means querying a non-exist column, generate empty column here
+ // columnIndex == -1 means querying a non-exist column, add null column here
if (columnIndex.get(i) == -1) {
- types.add(dataTypes.get(i));
- // use bitmap to mark as null value
- if (bitMaps == null) {
- bitMaps = new ArrayList<>(columnIndex.size());
- for (int j = 0; j < columnIndex.size(); j++) {
- bitMaps.add(null);
- }
- }
- generateEmptyColumn(dataTypes.get(i), values, bitMaps);
+ types.add(null);
+ values.add(null);
} else {
types.add(this.dataTypes.get(columnIndex.get(i)));
values.add(this.values.get(columnIndex.get(i)));
@@ -261,8 +255,13 @@ public class AlignedTVList extends TVList {
return alignedTvList;
}
- private void generateEmptyColumn(
- TSDataType dataType, List<List<Object>> values, List<List<BitMap>> bitMaps) {
+ public void extendColumn(TSDataType dataType) {
+ if (bitMaps == null) {
+ bitMaps = new ArrayList<>(values.size() + 1);
+ for (int i = 0; i < values.size() + 1; i++) {
+ bitMaps.add(null);
+ }
+ }
List<Object> columnValue = new ArrayList<>();
List<BitMap> columnBitMaps = new ArrayList<>();
for (int i = 0; i < timestamps.size(); i++) {
@@ -289,12 +288,19 @@ public class AlignedTVList extends TVList {
break;
}
BitMap bitMap = new BitMap(ARRAY_SIZE);
- bitMap.markAll();
+ // last bitmap should be marked to the tslist size's position
+ if (i == timestamps.size()) {
+ for (int j = 0; j < size % ARRAY_SIZE; j++) {
+ bitMap.mark(j);
+ }
+ } else {
+ bitMap.markAll();
+ }
columnBitMaps.add(bitMap);
}
// values.size() is the index of column
- bitMaps.set(values.size(), columnBitMaps);
- values.add(columnValue);
+ this.bitMaps.set(values.size(), columnBitMaps);
+ this.values.add(columnValue);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 25c6aa2..8f1536b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -207,7 +207,7 @@ public abstract class TVList {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
- public TVList getTvListByColumnIndex(List<Integer> columnIndexList, List<TSDataType> dataTypes) {
+ public TVList getTvListByColumnIndex(List<Integer> columnIndexList) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index d2e97cd..fb4a75f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -223,6 +223,22 @@ public class VectorMeasurementSchema
return subMeasurementsToIndexMap.containsKey(subMeasurement);
}
+ public void addSubMeasurement(String measurementId, TSDataType dataType, TSEncoding encoding) {
+ subMeasurementsToIndexMap.put(measurementId, subMeasurementsToIndexMap.size());
+ byte[] typesInByte = new byte[subMeasurementsToIndexMap.size()];
+ for (int i = 0; i < subMeasurementsToIndexMap.size(); i++) {
+ typesInByte[i] = types[i];
+ }
+ typesInByte[typesInByte.length - 1] = dataType.serialize();
+ this.types = typesInByte;
+ byte[] encodingsInByte = new byte[subMeasurementsToIndexMap.size()];
+ for (int i = 0; i < subMeasurementsToIndexMap.size(); i++) {
+ encodingsInByte[i] = encodings[i];
+ }
+ encodingsInByte[encodingsInByte.length - 1] = encoding.serialize();
+ this.encodings = encodingsInByte;
+ }
+
@Override
public int serializeTo(ByteBuffer buffer) {
int byteLen = 0;