You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/01 05:55:50 UTC
[iotdb] branch new_vector updated: remove more if type=vector
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new f0e4a8e remove more if type=vector
f0e4a8e is described below
commit f0e4a8e60e6d9f138b26f104cd5afcc24b805ec6
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Nov 1 13:54:51 2021 +0800
remove more if type=vector
---
.../db/engine/memtable/IWritableMemChunk.java | 2 +-
.../db/engine/memtable/VectorWritableMemChunk.java | 6 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 5 +-
.../db/engine/storagegroup/TsFileProcessor.java | 11 +-
.../apache/iotdb/db/rescon/TVListAllocator.java | 2 +-
.../iotdb/db/utils/datastructure/TVList.java | 65 ++--------
.../iotdb/db/utils/datastructure/VectorTVList.java | 136 ++++++++++++++++++---
7 files changed, 143 insertions(+), 84 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 62850ef..11e5ddd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -124,7 +124,7 @@ public interface IWritableMemChunk {
int delete(long lowerBound, long upperBound);
// For delete one column in the vector
- int delete(long lowerBound, long upperBound, int columnIndex);
+ int delete(long lowerBound, long upperBound, String measurementId);
IChunkWriter createIChunkWriter();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
index 047f048..c0d44bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
@@ -191,12 +191,12 @@ public class VectorWritableMemChunk implements IWritableMemChunk {
@Override
public int delete(long lowerBound, long upperBound) {
- // TODO Auto-generated method stub
- return 0;
+ return list.delete(lowerBound, upperBound);
}
@Override
- public int delete(long lowerBound, long upperBound, int columnIndex) {
+ // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
+ public int delete(long lowerBound, long upperBound, String measurementId) {
// TODO Auto-generated method stub
return 0;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4bfcee5..4ad3d81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -257,10 +257,9 @@ public class WritableMemChunk implements IWritableMemChunk {
return list.delete(lowerBound, upperBound);
}
- // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
@Override
- public int delete(long lowerBound, long upperBound, int columnIndex) {
- return list.delete(lowerBound, upperBound, columnIndex);
+ public int delete(long lowerBound, long upperBound, String measurementId) {
+ throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 83f123c..3a81cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
import org.apache.iotdb.db.writelog.WALFlushListener;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -385,13 +386,13 @@ public class TsFileProcessor {
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(
insertRowPlan.getMeasurements()[0], insertRowPlan.getDataTypes()[0]);
- memTableIncrement += TVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes());
+ memTableIncrement += VectorTVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes());
} else {
// here currentChunkPointNum >= 1
int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
memTableIncrement +=
(currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
- ? TVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes())
+ ? VectorTVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes())
: 0;
}
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
@@ -510,19 +511,19 @@ public class TsFileProcessor {
dataTypes.length * ChunkMetadata.calculateRamSize(measurementId, dataTypes[0]);
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
- * TVList.vectorTvListArrayMemSize(dataTypes);
+ * VectorTVList.vectorTvListArrayMemSize(dataTypes);
} else {
int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
- * TVList.vectorTvListArrayMemSize(dataTypes);
+ * VectorTVList.vectorTvListArrayMemSize(dataTypes);
} else {
int acquireArray =
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
/ PrimitiveArrayManager.ARRAY_SIZE;
memIncrements[0] +=
- acquireArray == 0 ? 0 : acquireArray * TVList.vectorTvListArrayMemSize(dataTypes);
+ acquireArray == 0 ? 0 : acquireArray * VectorTVList.vectorTvListArrayMemSize(dataTypes);
}
}
// TEXT data size
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
index fab5fab..06902e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
@@ -58,7 +58,7 @@ public class TVListAllocator implements TVListAllocatorMBean, IService {
}
public synchronized VectorTVList allocate(List<TSDataType> dataTypes) {
- return TVList.newVectorList(dataTypes);
+ return VectorTVList.newVectorList(dataTypes);
}
/** For non-vector types. */
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 8276c9a..1a46af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -82,10 +82,6 @@ public abstract class TVList {
return null;
}
- public static VectorTVList newVectorList(List<TSDataType> datatypes) {
- return new VectorTVList(datatypes);
- }
-
public static long tvListArrayMemSize(TSDataType type) {
long size = 0;
// time size
@@ -95,25 +91,6 @@ public abstract class TVList {
return size;
}
- /**
- * For Vector data type.
- *
- * @param types the types in the vector
- * @return VectorTvListArrayMemSize
- */
- public static long vectorTvListArrayMemSize(TSDataType[] types) {
- long size = 0;
- // time size
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
- // index size
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * 4L;
- // value size
- for (TSDataType type : types) {
- size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
- }
- return size;
- }
-
public boolean isSorted() {
return sorted;
}
@@ -287,17 +264,9 @@ public abstract class TVList {
releaseLastTimeArray();
releaseLastValueArray();
}
- if (getDataType() == TSDataType.VECTOR) {
- return deletedNumber * ((VectorTVList) this).getTsDataTypes().size();
- }
return deletedNumber;
}
- // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
- public int delete(long lowerBound, long upperBound, int columnIndex) {
- throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
- }
-
protected void cloneAs(TVList cloneList) {
for (long[] timestampArray : timestamps) {
cloneList.timestamps.add(cloneTime(timestampArray));
@@ -548,18 +517,18 @@ public abstract class TVList {
return new Ite(floatPrecision, encoding, size, deletionList);
}
- private class Ite implements IPointReader {
+ protected class Ite implements IPointReader {
- private TimeValuePair cachedTimeValuePair;
- private boolean hasCachedPair;
- private int cur;
- private Integer floatPrecision;
- private TSEncoding encoding;
+ protected TimeValuePair cachedTimeValuePair;
+ protected boolean hasCachedPair;
+ protected int cur;
+ protected Integer floatPrecision;
+ protected TSEncoding encoding;
private int deleteCursor = 0;
/**
* because TV list may be share with different query, each iterator has to record it's own size
*/
- private int iteSize = 0;
+ protected int iteSize = 0;
/** this field is effective only in the Tvlist in a RealOnlyMemChunk. */
private List<TimeRange> deletionList;
@@ -580,30 +549,14 @@ public abstract class TVList {
return true;
}
- List<Integer> timeDuplicatedVectorRowIndexList = null;
while (cur < iteSize) {
long time = getTime(cur);
if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
- // record the time duplicated row index list for vector type
- if (getDataType() == TSDataType.VECTOR) {
- if (timeDuplicatedVectorRowIndexList == null) {
- timeDuplicatedVectorRowIndexList = new ArrayList<>();
- timeDuplicatedVectorRowIndexList.add(getValueIndex(cur));
- }
- timeDuplicatedVectorRowIndexList.add(getValueIndex(cur + 1));
- }
cur++;
continue;
}
TimeValuePair tvPair;
- if (getDataType() == TSDataType.VECTOR && timeDuplicatedVectorRowIndexList != null) {
- tvPair =
- getTimeValuePairForTimeDuplicatedRows(
- timeDuplicatedVectorRowIndexList, time, floatPrecision, encoding);
- timeDuplicatedVectorRowIndexList = null;
- } else {
- tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
- }
+ tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
cur++;
if (tvPair.getValue() != null) {
cachedTimeValuePair = tvPair;
@@ -615,7 +568,7 @@ public abstract class TVList {
return false;
}
- private boolean isPointDeleted(long timestamp) {
+ protected boolean isPointDeleted(long timestamp) {
while (deletionList != null && deleteCursor < deletionList.size()) {
if (deletionList.get(deleteCursor).contains(timestamp)) {
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index 84ea6bd..24ea2cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -20,9 +20,12 @@
package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -65,6 +68,10 @@ public class VectorTVList extends TVList {
}
}
+ public static VectorTVList newVectorList(List<TSDataType> datatypes) {
+ return new VectorTVList(datatypes);
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public void putVector(long timestamp, Object[] value, int[] columnOrder) {
@@ -340,6 +347,36 @@ public class VectorTVList extends TVList {
return dataTypes;
}
+ @Override
+ public int delete(long lowerBound, long upperBound) {
+ int newSize = 0;
+ minTime = Long.MAX_VALUE;
+ for (int i = 0; i < size; i++) {
+ long time = getTime(i);
+ if (time < lowerBound || time > upperBound) {
+ set(i, newSize++);
+ minTime = Math.min(time, minTime);
+ }
+ }
+ int deletedNumber = size - newSize;
+ size = newSize;
+ // release primitive arrays that are empty
+ int newArrayNum = newSize / ARRAY_SIZE;
+ if (newSize % ARRAY_SIZE != 0) {
+ newArrayNum++;
+ }
+ for (int releaseIdx = newArrayNum; releaseIdx < timestamps.size(); releaseIdx++) {
+ releaseLastTimeArray();
+ releaseLastValueArray();
+ }
+ return deletedNumber * getTsDataTypes().size();
+ }
+
+ // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
+ public int delete(long lowerBound, long upperBound, int columnIndex) {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+ }
+
protected void set(int index, long timestamp, int value) {
int arrayIndex = index / ARRAY_SIZE;
int elementIndex = index % ARRAY_SIZE;
@@ -565,31 +602,19 @@ public class VectorTVList extends TVList {
@Override
public TimeValuePair getTimeValuePair(int index) {
- if (this.dataTypes.size() == 1) {
- return new TimeValuePair(getTime(index), ((TsPrimitiveType) getVector(index)).getVector()[0]);
- } else {
- return new TimeValuePair(getTime(index), (TsPrimitiveType) getVector(index));
- }
+ return new TimeValuePair(getTime(index), (TsPrimitiveType) getVector(index));
}
@Override
protected TimeValuePair getTimeValuePair(
int index, long time, Integer floatPrecision, TSEncoding encoding) {
- if (this.dataTypes.size() == 1) {
- return new TimeValuePair(time, ((TsPrimitiveType) getVector(index)).getVector()[0]);
- } else {
- return new TimeValuePair(time, (TsPrimitiveType) getVector(index));
- }
+ return new TimeValuePair(time, (TsPrimitiveType) getVector(index));
}
@Override
public TimeValuePair getTimeValuePairForTimeDuplicatedRows(
List<Integer> indexList, long time, Integer floatPrecision, TSEncoding encoding) {
- if (this.dataTypes.size() == 1) {
- return new TimeValuePair(time, getVector(indexList).getVector()[0]);
- } else {
- return new TimeValuePair(time, getVector(indexList));
- }
+ return new TimeValuePair(time, getVector(indexList));
}
@Override
@@ -718,4 +743,85 @@ public class VectorTVList extends TVList {
public TSDataType getDataType() {
return TSDataType.VECTOR;
}
+
+ /**
+ * Get the single vectorTVList array size by give types.
+ *
+ * @param types the types in the vector
+ * @return VectorTvListArrayMemSize
+ */
+ public static long vectorTvListArrayMemSize(TSDataType[] types) {
+ long size = 0;
+ // time size
+ size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
+ // index size
+ size += (long) PrimitiveArrayManager.ARRAY_SIZE * 4L;
+ // value size
+ for (TSDataType type : types) {
+ size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
+ }
+ return size;
+ }
+
+ @Override
+ @TestOnly
+ public IPointReader getIterator() {
+ return new VectorIte();
+ }
+
+ @Override
+ public IPointReader getIterator(
+ int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
+ return new VectorIte(floatPrecision, encoding, size, deletionList);
+ }
+
+ private class VectorIte extends Ite {
+
+ public VectorIte() {
+ super();
+ }
+
+ public VectorIte(
+ int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
+ super(floatPrecision, encoding, size, deletionList);
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ if (hasCachedPair) {
+ return true;
+ }
+
+ List<Integer> timeDuplicatedVectorRowIndexList = null;
+ while (cur < iteSize) {
+ long time = getTime(cur);
+ if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
+ if (timeDuplicatedVectorRowIndexList == null) {
+ timeDuplicatedVectorRowIndexList = new ArrayList<>();
+ timeDuplicatedVectorRowIndexList.add(getValueIndex(cur));
+ }
+ timeDuplicatedVectorRowIndexList.add(getValueIndex(cur + 1));
+ cur++;
+ continue;
+ }
+ TimeValuePair tvPair;
+ if (timeDuplicatedVectorRowIndexList != null) {
+ tvPair =
+ getTimeValuePairForTimeDuplicatedRows(
+ timeDuplicatedVectorRowIndexList, time, floatPrecision, encoding);
+ timeDuplicatedVectorRowIndexList = null;
+ } else {
+ tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
+ }
+ cur++;
+ if (tvPair.getValue() != null) {
+ cachedTimeValuePair = tvPair;
+ hasCachedPair = true;
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
}