You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/14 02:59:29 UTC
[iotdb] branch vectorMemTable updated: change flush memtable from
by column to by row
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/vectorMemTable by this push:
new d6152e2 change flush memtable from by column to by row
d6152e2 is described below
commit d6152e26f1b73ca3e24f287e3666f5ce2fd32568
Author: HTHou <hh...@outlook.com>
AuthorDate: Sun Mar 14 10:58:07 2021 +0800
change flush memtable from by column to by row
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 88 +++++----------
.../db/engine/storagegroup/TsFileProcessor.java | 13 ++-
.../iotdb/db/utils/datastructure/VectorTVList.java | 122 ++++++++++++++++++++-
.../file/metadata/statistics/Statistics.java | 2 +
4 files changed, 162 insertions(+), 63 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7f9c91f..c60ec2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -164,11 +164,6 @@ public class MemTableFlushTask {
private void writeOneSeries(
TVList tvPairs, IChunkWriter seriesWriterImpl, TSDataType dataType) {
- if (dataType == TSDataType.VECTOR) {
- writeOneVectorSeries(tvPairs, seriesWriterImpl);
- return;
- }
-
for (int i = 0; i < tvPairs.size(); i++) {
long time = tvPairs.getTime(i);
@@ -202,10 +197,35 @@ public class MemTableFlushTask {
seriesWriterImpl.write(time, tvPairs.getBinary(i), false);
break;
case VECTOR:
- // TODO:
- // for ( : tvPairs.getVector(i)) {
- // seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
- // }
+ VectorTVList vectorTVPairs = (VectorTVList) tvPairs;
+ List<TSDataType> dataTypes = vectorTVPairs.getTsDataTypes();
+ for (int j = 0; j < dataTypes.size(); j++) {
+ switch (dataTypes.get(j)) {
+ case BOOLEAN:
+ seriesWriterImpl.write(time, vectorTVPairs.getBoolean(i, j), false);
+ break;
+ case INT32:
+ seriesWriterImpl.write(time, vectorTVPairs.getInt(i, j), false);
+ break;
+ case INT64:
+ seriesWriterImpl.write(time, vectorTVPairs.getLong(i, j), false);
+ break;
+ case FLOAT:
+ seriesWriterImpl.write(time, vectorTVPairs.getFloat(i, j), false);
+ break;
+ case DOUBLE:
+ seriesWriterImpl.write(time, vectorTVPairs.getDouble(i, j), false);
+ break;
+ case TEXT:
+ seriesWriterImpl.write(time, vectorTVPairs.getBinary(i, j), false);
+ break;
+ default:
+ LOGGER.error(
+ "Storage group {} does not support data type: {}", storageGroup, dataType);
+ break;
+ }
+ }
+ seriesWriterImpl.write(time);
break;
default:
LOGGER.error(
@@ -215,56 +235,6 @@ public class MemTableFlushTask {
}
}
- private void writeOneVectorSeries(TVList tvPairs, IChunkWriter seriesWriterImpl) {
- VectorTVList tvList = (VectorTVList) tvPairs;
- List<TSDataType> dataTypes = tvList.getTsDataTypes();
- List<List<Object>> values = tvList.getValues();
- for (int i = 0; i < dataTypes.size(); i++) {
- List<Object> columnValues = values.get(i);
- for (int j = 0; j < tvList.size(); j++) {
- long time = tvList.getTime(j);
- // skip duplicated data
- if ((i + 1 < tvList.size() && (time == tvPairs.getTime(i + 1)))) {
- continue;
- }
- int valueIndex = tvList.getValueIndex(j);
- if (valueIndex >= tvList.size()) {
- throw new ArrayIndexOutOfBoundsException(valueIndex);
- }
- int arrayIndex = valueIndex / ARRAY_SIZE;
- int elementIndex = valueIndex % ARRAY_SIZE;
- switch (dataTypes.get(i)) {
- case TEXT:
- seriesWriterImpl.write(
- time, ((Binary[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- case FLOAT:
- seriesWriterImpl.write(
- time, ((float[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- case INT32:
- seriesWriterImpl.write(
- time, ((int[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- case INT64:
- seriesWriterImpl.write(
- time, ((long[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- case DOUBLE:
- seriesWriterImpl.write(
- time, ((double[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- case BOOLEAN:
- seriesWriterImpl.write(
- time, ((boolean[]) columnValues.get(arrayIndex))[elementIndex], false);
- break;
- default:
- break;
- }
- }
- }
- }
-
@SuppressWarnings("squid:S135")
@Override
public void run() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1e705ab..6a4d9ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -290,9 +291,15 @@ public class TsFileProcessor {
}
if (workMemTable.checkIfChunkDoesNotExist(deviceId, insertRowPlan.getMeasurements()[i])) {
// ChunkMetadataIncrement
- chunkMetadataIncrement +=
- ChunkMetadata.calculateRamSize(
- insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
+ if (insertRowPlan.getDataTypes()[i] == TSDataType.VECTOR) {
+ // TODO: insertRowPlan
+ // chunkMetadataIncrement += VectorChunkMetadata.calculateRamSize(insertRowPlan....);
+ }
+ else {
+ chunkMetadataIncrement +=
+ ChunkMetadata.calculateRamSize(
+ insertRowPlan.getMeasurements()[i], insertRowPlan.getDataTypes()[i]);
+ }
memTableIncrement += TVList.tvListArrayMemSize(insertRowPlan.getDataTypes()[i]);
} else {
// here currentChunkPointNum >= 1
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index d54da4f..44b7952 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -150,6 +150,126 @@ public class VectorTVList extends TVList {
return TsPrimitiveType.getByType(TSDataType.VECTOR, vector);
}
+ public int getInt(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getIntByValueIndex(valueIndex, column);
+ }
+
+ private int getIntByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((int[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
+ public long getLong(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getLongByValueIndex(valueIndex, column);
+ }
+
+ private long getLongByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((long[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
+ public float getFloat(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getFloatByValueIndex(valueIndex, column);
+ }
+
+ private float getFloatByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((float[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
+ public double getDouble(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getDoubleByValueIndex(valueIndex, column);
+ }
+
+ private double getDoubleByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((Double[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
+ public Binary getBinary(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getBinaryByValueIndex(valueIndex, column);
+ }
+
+ private Binary getBinaryByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
+ public boolean getBoolean(int index, int column) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ int valueIndex = indices.get(arrayIndex)[elementIndex];
+ return getBooleanByValueIndex(valueIndex, column);
+ }
+
+ private boolean getBooleanByValueIndex(int valueIndex, int column) {
+ if (valueIndex >= size) {
+ throw new ArrayIndexOutOfBoundsException(valueIndex);
+ }
+ int arrayIndex = valueIndex / ARRAY_SIZE;
+ int elementIndex = valueIndex % ARRAY_SIZE;
+ List<Object> columnValues = values.get(column);
+ return ((boolean[]) columnValues.get(arrayIndex))[elementIndex];
+ }
+
public List<List<Object>> getValues() {
return values;
}
@@ -334,7 +454,7 @@ public class VectorTVList extends TVList {
@Override
protected TimeValuePair getTimeValuePair(
int index, long time, Integer floatPrecision, TSEncoding encoding) {
- return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index)));
+ return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index)));
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 759a751..cd1abe1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -98,6 +98,8 @@ public abstract class Statistics<T> {
return DoubleStatistics.DOUBLE_STATISTICS_FIXED_RAM_SIZE;
case FLOAT:
return FloatStatistics.FLOAT_STATISTICS_FIXED_RAM_SIZE;
+ case VECTOR:
+ return TimeStatistics.TIME_STATISTICS_FIXED_RAM_SIZE;
default:
throw new UnknownColumnTypeException(type.toString());
}