You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:21 UTC
[iotdb] 01/04: memtable
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb42aede25ab35a0a43d9e9ad3d14aa7a124765f
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 13:55:58 2021 +0800
memtable
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 6 +
.../iotdb/db/rescon/PrimitiveArrayManager.java | 9 +-
.../iotdb/db/utils/datastructure/TVList.java | 14 ++
.../iotdb/db/utils/datastructure/VectorTVList.java | 230 +++++++++++++++++++++
.../db/utils/datastructure/VectorTVListTest.java | 67 ++++++
.../tsfile/file/metadata/enums/TSDataType.java | 6 +-
.../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 62 ++++++
7 files changed, 392 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..dcd82ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -190,6 +190,12 @@ public class MemTableFlushTask {
case TEXT:
seriesWriterImpl.write(time, tvPairs.getBinary(i));
break;
+ case VECTOR:
+ // TODO:
+// for ( : tvPairs.getVector(i)) {
+// seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
+// }
+ break;
default:
LOGGER.error(
"Storage group {} does not support data type: {}", storageGroup, dataType);
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index aa6c264..b471332 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
-
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +71,7 @@ public class PrimitiveArrayManager {
bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+ bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>());
}
private PrimitiveArrayManager() {
@@ -127,6 +128,9 @@ public class PrimitiveArrayManager {
case TEXT:
dataArray = new Binary[ARRAY_SIZE];
break;
+ case VECTOR:
+ dataArray = new byte[ARRAY_SIZE][];
+ break;
default:
throw new UnSupportedDataTypeException(dataType.toString());
}
@@ -205,6 +209,9 @@ public class PrimitiveArrayManager {
} else if (dataArray instanceof Binary[]) {
Arrays.fill((Binary[]) dataArray, null);
dataType = TSDataType.TEXT;
+ } else if (dataArray instanceof TsPrimitiveType[][]) {
+ Arrays.fill((TsPrimitiveType[][]) dataArray, null);
+ dataType = TSDataType.VECTOR;
} else {
throw new UnSupportedDataTypeException("Unknown data array type");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..3e0ef74 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -73,6 +73,8 @@ public abstract class TVList {
return new DoubleTVList();
case BOOLEAN:
return new BooleanTVList();
+ case VECTOR:
+ return new VectorTVList();
default:
break;
}
@@ -137,6 +139,10 @@ public abstract class TVList {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
+ public void putVector(long time, byte[] value) {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+ }
+
public void putLongs(long[] time, long[] value, int start, int end) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@@ -161,6 +167,10 @@ public abstract class TVList {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
+ public void putVectors(long[] time, byte[][] value, int start, int end) {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+ }
+
public long getLong(int index) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@@ -185,6 +195,10 @@ public abstract class TVList {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
+ public byte[] getVector(int index) {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+ }
+
public abstract void sort();
public long getMinTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
new file mode 100644
index 0000000..e0a652a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
+
+public class VectorTVList extends TVList {
+
+ private List<Object[]> values;
+
+ private byte[][][] sortedValues;
+
+ private byte[] pivotValue;
+
+ VectorTVList() {
+ super();
+ values = new ArrayList<>();
+ }
+
+ @Override
+ public void putVector(long timestamp, byte[] value) {
+ checkExpansion();
+ int arrayIndex = size / ARRAY_SIZE;
+ int elementIndex = size % ARRAY_SIZE;
+ minTime = Math.min(minTime, timestamp);
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ values.get(arrayIndex)[elementIndex] = value;
+ size++;
+ if (sorted && size > 1 && timestamp < getTime(size - 2)) {
+ sorted = false;
+ }
+ }
+
+ @Override
+ public byte[] getVector(int index) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ return (byte[]) values.get(arrayIndex)[elementIndex];
+ }
+
+ protected void set(int index, long timestamp, byte[] value) {
+ if (index >= size) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ values.get(arrayIndex)[elementIndex] = value;
+ }
+
+ @Override
+ public VectorTVList clone() {
+ VectorTVList cloneList = new VectorTVList();
+ cloneAs(cloneList);
+ for (Object[] valueArray : values) {
+ cloneList.values.add(cloneValue(valueArray));
+ }
+ return cloneList;
+ }
+
+ private TsPrimitiveType[][] cloneValue(Object[] valueArray) {
+ TsPrimitiveType[][] cloneArray = (TsPrimitiveType[][])new Object[valueArray.length];
+ System.arraycopy(valueArray, 0, cloneArray, 0, valueArray.length);
+ return cloneArray;
+ }
+
+ @Override
+ public void sort() {
+ if (sortedTimestamps == null || sortedTimestamps.length < size) {
+ sortedTimestamps =
+ (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, size);
+ }
+ if (sortedValues == null || sortedValues.length < size) {
+ sortedValues =
+ (byte[][][])
+ PrimitiveArrayManager.createDataListsByType(TSDataType.VECTOR, size);
+ }
+ sort(0, size);
+ clearSortedValue();
+ clearSortedTime();
+ sorted = true;
+ }
+
+ @Override
+ void clearValue() {
+ if (values != null) {
+ for (Object[] dataArray : values) {
+ PrimitiveArrayManager.release(dataArray);
+ }
+ values.clear();
+ }
+ }
+
+ @Override
+ void clearSortedValue() {
+ if (sortedValues != null) {
+ sortedValues = null;
+ }
+ }
+
+ @Override
+ protected void setFromSorted(int src, int dest) {
+ set(
+ dest,
+ sortedTimestamps[src / ARRAY_SIZE][src % ARRAY_SIZE],
+ sortedValues[src / ARRAY_SIZE][src % ARRAY_SIZE]);
+ }
+
+ @Override
+ protected void set(int src, int dest) {
+ long srcT = getTime(src);
+ byte[] srcV = getVector(src);
+ set(dest, srcT, srcV);
+ }
+
+ @Override
+ protected void setToSorted(int src, int dest) {
+ sortedTimestamps[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getTime(src);
+ sortedValues[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getVector(src);
+ }
+
+ @Override
+ protected void reverseRange(int lo, int hi) {
+ hi--;
+ while (lo < hi) {
+ long loT = getTime(lo);
+ byte[] loV = getVector(lo);
+ long hiT = getTime(hi);
+ byte[] hiV = getVector(hi);
+ set(lo++, hiT, hiV);
+ set(hi--, loT, loV);
+ }
+ }
+
+ @Override
+ protected void expandValues() {
+ values.add((Object[]) getPrimitiveArraysByType(TSDataType.VECTOR));
+ }
+
+ @Override
+ protected void saveAsPivot(int pos) {
+ pivotTime = getTime(pos);
+ pivotValue = getVector(pos);
+ }
+
+ @Override
+ protected void setPivotTo(int pos) {
+ set(pos, pivotTime, pivotValue);
+ }
+
+ @Override
+ public TimeValuePair getTimeValuePair(int index) {
+ return new TimeValuePair(
+ getTime(index), TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index)));
+ }
+
+ @Override
+ protected TimeValuePair getTimeValuePair(
+ int index, long time, Integer floatPrecision, TSEncoding encoding) {
+ return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index)));
+ }
+
+ @Override
+ protected void releaseLastValueArray() {
+ PrimitiveArrayManager.release(values.remove(values.size() - 1));
+ }
+
+ @Override
+ public void putVectors(long[] time, byte[][] value, int start, int end) {
+ checkExpansion();
+ int idx = start;
+
+ updateMinTimeAndSorted(time, start, end);
+
+ while (idx < end) {
+ int inputRemaining = end - idx;
+ int arrayIdx = size / ARRAY_SIZE;
+ int elementIdx = size % ARRAY_SIZE;
+ int internalRemaining = ARRAY_SIZE - elementIdx;
+ if (internalRemaining >= inputRemaining) {
+ // the remaining inputs can fit the last array, copy all remaining inputs into last array
+ System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining);
+ System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining);
+ size += inputRemaining;
+ break;
+ } else {
+ // the remaining inputs cannot fit the last array, fill the last array and create a new
+ // one and enter the next loop
+ System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
+ System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
+ idx += internalRemaining;
+ size += internalRemaining;
+ checkExpansion();
+ }
+ }
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.VECTOR;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
new file mode 100644
index 0000000..7095612
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VectorTVListTest {
+
+ @Test
+ public void testVectorTVList() {
+ VectorTVList tvList = new VectorTVList();
+ for (int i = 0; i < 1000; i++) {
+ byte[] value = new byte[4 * 5];
+ byte[] bytes = new byte[4];
+ for (int j = 0; j < 20; j++) {
+ if (j % 4 == 0) {
+ bytes = BytesUtils.intToBytes(i);
+ }
+ value[j] = bytes[j % 4];
+ }
+ tvList.putVector(i, value);
+ }
+ for (int i = 0; i < tvList.size; i++) {
+ Assert.assertEquals(String.valueOf(i), tvList.getVector(i).toString());
+ Assert.assertEquals(i, tvList.getTime(i));
+ }
+ }
+
+ @Test
+ public void testVectorTVLists() {
+ VectorTVList tvList = new VectorTVList();
+ byte[][] vectorList = new byte[1001][4 * 5];
+ List<Long> timeList = new ArrayList<>();
+ for (int i = 1000; i >= 0; i--) {
+ timeList.add((long) i);
+ for (int j = 0; j < 5; j++) {
+ vectorList[i][j] = 0;
+ }
+ }
+ tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorList, 0, 1000);
+ for (long i = 0; i < tvList.size; i++) {
+ Assert.assertEquals(tvList.size - i, tvList.getTime((int) i));
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 0edda45..02a2086 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -41,7 +41,10 @@ public enum TSDataType {
DOUBLE((byte) 4),
/** TEXT */
- TEXT((byte) 5);
+ TEXT((byte) 5),
+
+ /** VECTOR */
+ VECTOR((byte) 6);
private final byte type;
@@ -96,6 +99,7 @@ public enum TSDataType {
case TEXT:
case INT64:
case DOUBLE:
+ case VECTOR:
return 8;
default:
throw new UnSupportedDataTypeException(this.toString());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 73b01d2..32c621f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -45,11 +45,23 @@ public abstract class TsPrimitiveType implements Serializable {
return new TsPrimitiveType.TsDouble((double) v);
case TEXT:
return new TsPrimitiveType.TsBinary((Binary) v);
+ case VECTOR:
+ return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v);
default:
throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
}
}
+ public void setVector(TsPrimitiveType[] val) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public TsPrimitiveType[] getVector() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public boolean getBoolean() {
throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class");
}
@@ -462,4 +474,54 @@ public abstract class TsPrimitiveType implements Serializable {
return false;
}
}
+
+ public static class TsVector extends TsPrimitiveType {
+
+ private TsPrimitiveType[] value;
+
+ public TsVector(TsPrimitiveType[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public TsPrimitiveType[] getVector() {
+ return value;
+ }
+
+ @Override
+ public void setVector(TsPrimitiveType[] val) {
+ this.value = val;
+ }
+
+ @Override
+ public int getSize() {
+ int size = 0;
+ for (TsPrimitiveType type : value) {
+ size += type.getSize();
+ }
+ // object header + array object header
+ return 4 + 4 + size;
+ }
+
+ @Override
+ public Object getValue() {
+ return getVector();
+ }
+
+ @Override
+ public String getStringValue() {
+ StringBuilder builder = new StringBuilder("[");
+ builder.append(value[0].getStringValue());
+ for (TsPrimitiveType type : value) {
+ builder.append(", ").append(type.getStringValue());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public TSDataType getDataType() {
+ return TSDataType.VECTOR;
+ }
+ }
}