You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/11 09:39:21 UTC

[iotdb] 01/04: memtable

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch vectorMemTable
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb42aede25ab35a0a43d9e9ad3d14aa7a124765f
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Mar 11 13:55:58 2021 +0800

    memtable
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   6 +
 .../iotdb/db/rescon/PrimitiveArrayManager.java     |   9 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  14 ++
 .../iotdb/db/utils/datastructure/VectorTVList.java | 230 +++++++++++++++++++++
 .../db/utils/datastructure/VectorTVListTest.java   |  67 ++++++
 .../tsfile/file/metadata/enums/TSDataType.java     |   6 +-
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |  62 ++++++
 7 files changed, 392 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 9397af1..dcd82ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -190,6 +190,12 @@ public class MemTableFlushTask {
               case TEXT:
                 seriesWriterImpl.write(time, tvPairs.getBinary(i));
                 break;
+              case VECTOR:
+                // TODO: 
+//                for ( : tvPairs.getVector(i)) {
+//                  seriesWriterImpl.write(time, tvPairs.getVector(i)[], get);
+//                }
+                break;
               default:
                 LOGGER.error(
                     "Storage group {} does not support data type: {}", storageGroup, dataType);
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index aa6c264..b471332 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
-
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +71,7 @@ public class PrimitiveArrayManager {
     bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>());
     bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>());
     bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>());
+    bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>());
   }
 
   private PrimitiveArrayManager() {
@@ -127,6 +128,9 @@ public class PrimitiveArrayManager {
       case TEXT:
         dataArray = new Binary[ARRAY_SIZE];
         break;
+      case VECTOR:
+        dataArray = new byte[ARRAY_SIZE][];
+        break;
       default:
         throw new UnSupportedDataTypeException(dataType.toString());
     }
@@ -205,6 +209,9 @@ public class PrimitiveArrayManager {
     } else if (dataArray instanceof Binary[]) {
       Arrays.fill((Binary[]) dataArray, null);
       dataType = TSDataType.TEXT;
+    } else if (dataArray instanceof TsPrimitiveType[][]) {
+      Arrays.fill((TsPrimitiveType[][]) dataArray, null);
+      dataType = TSDataType.VECTOR;
     } else {
       throw new UnSupportedDataTypeException("Unknown data array type");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index d24beae..3e0ef74 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -73,6 +73,8 @@ public abstract class TVList {
         return new DoubleTVList();
       case BOOLEAN:
         return new BooleanTVList();
+      case VECTOR:
+        return new VectorTVList();
       default:
         break;
     }
@@ -137,6 +139,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public void putVector(long time, byte[] value) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public void putLongs(long[] time, long[] value, int start, int end) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -161,6 +167,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public void putVectors(long[] time, byte[][] value, int start, int end) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public long getLong(int index) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
@@ -185,6 +195,10 @@ public abstract class TVList {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
+  public byte[] getVector(int index) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   public abstract void sort();
 
   public long getMinTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
new file mode 100644
index 0000000..e0a652a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
+
+public class VectorTVList extends TVList {
+
+  private List<Object[]> values;
+
+  private byte[][][] sortedValues;
+
+  private byte[] pivotValue;
+
+  VectorTVList() {
+    super();
+    values = new ArrayList<>();
+  }
+
+  @Override
+  public void putVector(long timestamp, byte[] value) {
+    checkExpansion();
+    int arrayIndex = size / ARRAY_SIZE;
+    int elementIndex = size % ARRAY_SIZE;
+    minTime = Math.min(minTime, timestamp);
+    timestamps.get(arrayIndex)[elementIndex] = timestamp;
+    values.get(arrayIndex)[elementIndex] = value;
+    size++;
+    if (sorted && size > 1 && timestamp < getTime(size - 2)) {
+      sorted = false;
+    }
+  }
+
+  @Override
+  public byte[] getVector(int index) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    return (byte[]) values.get(arrayIndex)[elementIndex];
+  }
+
+  protected void set(int index, long timestamp, byte[] value) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    timestamps.get(arrayIndex)[elementIndex] = timestamp;
+    values.get(arrayIndex)[elementIndex] = value;
+  }
+
+  @Override
+  public VectorTVList clone() {
+    VectorTVList cloneList = new VectorTVList();
+    cloneAs(cloneList);
+    for (Object[] valueArray : values) {
+      cloneList.values.add(cloneValue(valueArray));
+    }
+    return cloneList;
+  }
+
+  private TsPrimitiveType[][] cloneValue(Object[] valueArray) {
+    TsPrimitiveType[][] cloneArray = (TsPrimitiveType[][])new Object[valueArray.length];
+    System.arraycopy(valueArray, 0, cloneArray, 0, valueArray.length);
+    return cloneArray;
+  }
+
+  @Override
+  public void sort() {
+    if (sortedTimestamps == null || sortedTimestamps.length < size) {
+      sortedTimestamps =
+          (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, size);
+    }
+    if (sortedValues == null || sortedValues.length < size) {
+      sortedValues =
+          (byte[][][])
+              PrimitiveArrayManager.createDataListsByType(TSDataType.VECTOR, size);
+    }
+    sort(0, size);
+    clearSortedValue();
+    clearSortedTime();
+    sorted = true;
+  }
+
+  @Override
+  void clearValue() {
+    if (values != null) {
+      for (Object[] dataArray : values) {
+        PrimitiveArrayManager.release(dataArray);
+      }
+      values.clear();
+    }
+  }
+
+  @Override
+  void clearSortedValue() {
+    if (sortedValues != null) {
+      sortedValues = null;
+    }
+  }
+
+  @Override
+  protected void setFromSorted(int src, int dest) {
+    set(
+        dest,
+        sortedTimestamps[src / ARRAY_SIZE][src % ARRAY_SIZE],
+        sortedValues[src / ARRAY_SIZE][src % ARRAY_SIZE]);
+  }
+
+  @Override
+  protected void set(int src, int dest) {
+    long srcT = getTime(src);
+    byte[] srcV = getVector(src);
+    set(dest, srcT, srcV);
+  }
+
+  @Override
+  protected void setToSorted(int src, int dest) {
+    sortedTimestamps[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getTime(src);
+    sortedValues[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getVector(src);
+  }
+
+  @Override
+  protected void reverseRange(int lo, int hi) {
+    hi--;
+    while (lo < hi) {
+      long loT = getTime(lo);
+      byte[] loV = getVector(lo);
+      long hiT = getTime(hi);
+      byte[] hiV = getVector(hi);
+      set(lo++, hiT, hiV);
+      set(hi--, loT, loV);
+    }
+  }
+
+  @Override
+  protected void expandValues() {
+    values.add((Object[]) getPrimitiveArraysByType(TSDataType.VECTOR));
+  }
+
+  @Override
+  protected void saveAsPivot(int pos) {
+    pivotTime = getTime(pos);
+    pivotValue = getVector(pos);
+  }
+
+  @Override
+  protected void setPivotTo(int pos) {
+    set(pos, pivotTime, pivotValue);
+  }
+
+  @Override
+  public TimeValuePair getTimeValuePair(int index) {
+    return new TimeValuePair(
+        getTime(index), TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index)));
+  }
+
+  @Override
+  protected TimeValuePair getTimeValuePair(
+      int index, long time, Integer floatPrecision, TSEncoding encoding) {
+    return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index)));
+  }
+
+  @Override
+  protected void releaseLastValueArray() {
+    PrimitiveArrayManager.release(values.remove(values.size() - 1));
+  }
+
+  @Override
+  public void putVectors(long[] time, byte[][] value, int start, int end) {
+    checkExpansion();
+    int idx = start;
+
+    updateMinTimeAndSorted(time, start, end);
+
+    while (idx < end) {
+      int inputRemaining = end - idx;
+      int arrayIdx = size / ARRAY_SIZE;
+      int elementIdx = size % ARRAY_SIZE;
+      int internalRemaining = ARRAY_SIZE - elementIdx;
+      if (internalRemaining >= inputRemaining) {
+        // the remaining inputs can fit the last array, copy all remaining inputs into last array
+        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining);
+        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining);
+        size += inputRemaining;
+        break;
+      } else {
+        // the remaining inputs cannot fit the last array, fill the last array and create a new
+        // one and enter the next loop
+        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
+        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining);
+        idx += internalRemaining;
+        size += internalRemaining;
+        checkExpansion();
+      }
+    }
+  }
+
+  @Override
+  public TSDataType getDataType() {
+    return TSDataType.VECTOR;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
new file mode 100644
index 0000000..7095612
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VectorTVListTest {
+
+  @Test
+  public void testVectorTVList() {
+    VectorTVList tvList = new VectorTVList();
+    for (int i = 0; i < 1000; i++) {
+      byte[] value = new byte[4 * 5];
+      byte[] bytes = new byte[4];
+      for (int j = 0; j < 20; j++) {
+        if (j % 4 == 0) {
+          bytes = BytesUtils.intToBytes(i);
+        }
+        value[j] = bytes[j % 4];
+      }
+      tvList.putVector(i, value);
+    }
+    for (int i = 0; i < tvList.size; i++) {
+      Assert.assertEquals(String.valueOf(i), tvList.getVector(i).toString());
+      Assert.assertEquals(i, tvList.getTime(i));
+    }
+  }
+
+  @Test
+  public void testVectorTVLists() {
+    VectorTVList tvList = new VectorTVList();
+    byte[][] vectorList = new byte[1001][4 * 5];
+    List<Long> timeList = new ArrayList<>();
+    for (int i = 1000; i >= 0; i--) {
+      timeList.add((long) i);
+      for (int j = 0; j < 5; j++) {
+        vectorList[i][j] = 0;
+      }
+    }
+    tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorList, 0, 1000);
+    for (long i = 0; i < tvList.size; i++) {
+      Assert.assertEquals(tvList.size - i, tvList.getTime((int) i));
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
index 0edda45..02a2086 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java
@@ -41,7 +41,10 @@ public enum TSDataType {
   DOUBLE((byte) 4),
 
   /** TEXT */
-  TEXT((byte) 5);
+  TEXT((byte) 5),
+
+  /** VECTOR */
+  VECTOR((byte) 6);
 
   private final byte type;
 
@@ -96,6 +99,7 @@ public enum TSDataType {
       case TEXT:
       case INT64:
       case DOUBLE:
+      case VECTOR:
         return 8;
       default:
         throw new UnSupportedDataTypeException(this.toString());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 73b01d2..32c621f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -45,11 +45,23 @@ public abstract class TsPrimitiveType implements Serializable {
         return new TsPrimitiveType.TsDouble((double) v);
       case TEXT:
         return new TsPrimitiveType.TsBinary((Binary) v);
+      case VECTOR:
+        return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v);
       default:
         throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
     }
   }
 
+  public void setVector(TsPrimitiveType[] val) {
+    // TODO Auto-generated method stub
+
+  }
+
+  public TsPrimitiveType[] getVector() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
   public boolean getBoolean() {
     throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class");
   }
@@ -462,4 +474,54 @@ public abstract class TsPrimitiveType implements Serializable {
       return false;
     }
   }
+
+  public static class TsVector extends TsPrimitiveType {
+
+    private TsPrimitiveType[] value;
+
+    public TsVector(TsPrimitiveType[] value) {
+      this.value = value;
+    }
+
+    @Override
+    public TsPrimitiveType[] getVector() {
+      return value;
+    }
+
+    @Override
+    public void setVector(TsPrimitiveType[] val) {
+      this.value = val;
+    }
+
+    @Override
+    public int getSize() {
+      int size = 0;
+      for (TsPrimitiveType type : value) {
+        size += type.getSize();
+      }
+      // object header + array object header
+      return 4 + 4 + size;
+    }
+
+    @Override
+    public Object getValue() {
+      return getVector();
+    }
+
+    @Override
+    public String getStringValue() {
+      StringBuilder builder = new StringBuilder("[");
+      builder.append(value[0].getStringValue());
+      for (TsPrimitiveType type : value) {
+        builder.append(", ").append(type.getStringValue());
+      }
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    public TSDataType getDataType() {
+      return TSDataType.VECTOR;
+    }
+  }
 }