You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/01 05:55:50 UTC

[iotdb] branch new_vector updated: remove more if type=vector

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_vector by this push:
     new f0e4a8e  remove more if type=vector
f0e4a8e is described below

commit f0e4a8e60e6d9f138b26f104cd5afcc24b805ec6
Author: HTHou <hh...@outlook.com>
AuthorDate: Mon Nov 1 13:54:51 2021 +0800

    remove more if type=vector
---
 .../db/engine/memtable/IWritableMemChunk.java      |   2 +-
 .../db/engine/memtable/VectorWritableMemChunk.java |   6 +-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |   5 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  11 +-
 .../apache/iotdb/db/rescon/TVListAllocator.java    |   2 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  65 ++--------
 .../iotdb/db/utils/datastructure/VectorTVList.java | 136 ++++++++++++++++++---
 7 files changed, 143 insertions(+), 84 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 62850ef..11e5ddd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -124,7 +124,7 @@ public interface IWritableMemChunk {
   int delete(long lowerBound, long upperBound);
 
   // For delete one column in the vector
-  int delete(long lowerBound, long upperBound, int columnIndex);
+  int delete(long lowerBound, long upperBound, String measurementId);
 
   IChunkWriter createIChunkWriter();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
index 047f048..c0d44bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/VectorWritableMemChunk.java
@@ -191,12 +191,12 @@ public class VectorWritableMemChunk implements IWritableMemChunk {
 
   @Override
   public int delete(long lowerBound, long upperBound) {
-    // TODO Auto-generated method stub
-    return 0;
+    return list.delete(lowerBound, upperBound);
   }
 
   @Override
-  public int delete(long lowerBound, long upperBound, int columnIndex) {
+  // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
+  public int delete(long lowerBound, long upperBound, String measurementId) {
     // TODO Auto-generated method stub
     return 0;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4bfcee5..4ad3d81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -257,10 +257,9 @@ public class WritableMemChunk implements IWritableMemChunk {
     return list.delete(lowerBound, upperBound);
   }
 
-  // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
   @Override
-  public int delete(long lowerBound, long upperBound, int columnIndex) {
-    return list.delete(lowerBound, upperBound, columnIndex);
+  public int delete(long lowerBound, long upperBound, String measurementId) {
+    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 83f123c..3a81cfa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.VectorTVList;
 import org.apache.iotdb.db.writelog.WALFlushListener;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -385,13 +386,13 @@ public class TsFileProcessor {
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(
               insertRowPlan.getMeasurements()[0], insertRowPlan.getDataTypes()[0]);
-      memTableIncrement += TVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes());
+      memTableIncrement += VectorTVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes());
     } else {
       // here currentChunkPointNum >= 1
       int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
       memTableIncrement +=
           (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
-              ? TVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes())
+              ? VectorTVList.vectorTvListArrayMemSize(insertRowPlan.getDataTypes())
               : 0;
     }
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
@@ -510,19 +511,19 @@ public class TsFileProcessor {
           dataTypes.length * ChunkMetadata.calculateRamSize(measurementId, dataTypes[0]);
       memIncrements[0] +=
           ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
-              * TVList.vectorTvListArrayMemSize(dataTypes);
+              * VectorTVList.vectorTvListArrayMemSize(dataTypes);
     } else {
       int currentChunkPointNum = workMemTable.getCurrentChunkPointNum(deviceId, null);
       if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
         memIncrements[0] +=
             ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
-                * TVList.vectorTvListArrayMemSize(dataTypes);
+                * VectorTVList.vectorTvListArrayMemSize(dataTypes);
       } else {
         int acquireArray =
             (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
                 / PrimitiveArrayManager.ARRAY_SIZE;
         memIncrements[0] +=
-            acquireArray == 0 ? 0 : acquireArray * TVList.vectorTvListArrayMemSize(dataTypes);
+            acquireArray == 0 ? 0 : acquireArray * VectorTVList.vectorTvListArrayMemSize(dataTypes);
       }
     }
     // TEXT data size
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
index fab5fab..06902e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TVListAllocator.java
@@ -58,7 +58,7 @@ public class TVListAllocator implements TVListAllocatorMBean, IService {
   }
 
   public synchronized VectorTVList allocate(List<TSDataType> dataTypes) {
-    return TVList.newVectorList(dataTypes);
+    return VectorTVList.newVectorList(dataTypes);
   }
 
   /** For non-vector types. */
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 8276c9a..1a46af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -82,10 +82,6 @@ public abstract class TVList {
     return null;
   }
 
-  public static VectorTVList newVectorList(List<TSDataType> datatypes) {
-    return new VectorTVList(datatypes);
-  }
-
   public static long tvListArrayMemSize(TSDataType type) {
     long size = 0;
     // time size
@@ -95,25 +91,6 @@ public abstract class TVList {
     return size;
   }
 
-  /**
-   * For Vector data type.
-   *
-   * @param types the types in the vector
-   * @return VectorTvListArrayMemSize
-   */
-  public static long vectorTvListArrayMemSize(TSDataType[] types) {
-    long size = 0;
-    // time size
-    size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
-    // index size
-    size += (long) PrimitiveArrayManager.ARRAY_SIZE * 4L;
-    // value size
-    for (TSDataType type : types) {
-      size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
-    }
-    return size;
-  }
-
   public boolean isSorted() {
     return sorted;
   }
@@ -287,17 +264,9 @@ public abstract class TVList {
       releaseLastTimeArray();
       releaseLastValueArray();
     }
-    if (getDataType() == TSDataType.VECTOR) {
-      return deletedNumber * ((VectorTVList) this).getTsDataTypes().size();
-    }
     return deletedNumber;
   }
 
-  // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
-  public int delete(long lowerBound, long upperBound, int columnIndex) {
-    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
-  }
-
   protected void cloneAs(TVList cloneList) {
     for (long[] timestampArray : timestamps) {
       cloneList.timestamps.add(cloneTime(timestampArray));
@@ -548,18 +517,18 @@ public abstract class TVList {
     return new Ite(floatPrecision, encoding, size, deletionList);
   }
 
-  private class Ite implements IPointReader {
+  protected class Ite implements IPointReader {
 
-    private TimeValuePair cachedTimeValuePair;
-    private boolean hasCachedPair;
-    private int cur;
-    private Integer floatPrecision;
-    private TSEncoding encoding;
+    protected TimeValuePair cachedTimeValuePair;
+    protected boolean hasCachedPair;
+    protected int cur;
+    protected Integer floatPrecision;
+    protected TSEncoding encoding;
     private int deleteCursor = 0;
     /**
      * because TV list may be share with different query, each iterator has to record it's own size
      */
-    private int iteSize = 0;
+    protected int iteSize = 0;
     /** this field is effective only in the Tvlist in a RealOnlyMemChunk. */
     private List<TimeRange> deletionList;
 
@@ -580,30 +549,14 @@ public abstract class TVList {
         return true;
       }
 
-      List<Integer> timeDuplicatedVectorRowIndexList = null;
       while (cur < iteSize) {
         long time = getTime(cur);
         if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
-          // record the time duplicated row index list for vector type
-          if (getDataType() == TSDataType.VECTOR) {
-            if (timeDuplicatedVectorRowIndexList == null) {
-              timeDuplicatedVectorRowIndexList = new ArrayList<>();
-              timeDuplicatedVectorRowIndexList.add(getValueIndex(cur));
-            }
-            timeDuplicatedVectorRowIndexList.add(getValueIndex(cur + 1));
-          }
           cur++;
           continue;
         }
         TimeValuePair tvPair;
-        if (getDataType() == TSDataType.VECTOR && timeDuplicatedVectorRowIndexList != null) {
-          tvPair =
-              getTimeValuePairForTimeDuplicatedRows(
-                  timeDuplicatedVectorRowIndexList, time, floatPrecision, encoding);
-          timeDuplicatedVectorRowIndexList = null;
-        } else {
-          tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
-        }
+        tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
         cur++;
         if (tvPair.getValue() != null) {
           cachedTimeValuePair = tvPair;
@@ -615,7 +568,7 @@ public abstract class TVList {
       return false;
     }
 
-    private boolean isPointDeleted(long timestamp) {
+    protected boolean isPointDeleted(long timestamp) {
       while (deletionList != null && deleteCursor < deletionList.size()) {
         if (deletionList.get(deleteCursor).contains(timestamp)) {
           return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
index 84ea6bd..24ea2cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java
@@ -20,9 +20,12 @@
 package org.apache.iotdb.db.utils.datastructure;
 
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -65,6 +68,10 @@ public class VectorTVList extends TVList {
     }
   }
 
+  public static VectorTVList newVectorList(List<TSDataType> datatypes) {
+    return new VectorTVList(datatypes);
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   @Override
   public void putVector(long timestamp, Object[] value, int[] columnOrder) {
@@ -340,6 +347,36 @@ public class VectorTVList extends TVList {
     return dataTypes;
   }
 
+  @Override
+  public int delete(long lowerBound, long upperBound) {
+    int newSize = 0;
+    minTime = Long.MAX_VALUE;
+    for (int i = 0; i < size; i++) {
+      long time = getTime(i);
+      if (time < lowerBound || time > upperBound) {
+        set(i, newSize++);
+        minTime = Math.min(time, minTime);
+      }
+    }
+    int deletedNumber = size - newSize;
+    size = newSize;
+    // release primitive arrays that are empty
+    int newArrayNum = newSize / ARRAY_SIZE;
+    if (newSize % ARRAY_SIZE != 0) {
+      newArrayNum++;
+    }
+    for (int releaseIdx = newArrayNum; releaseIdx < timestamps.size(); releaseIdx++) {
+      releaseLastTimeArray();
+      releaseLastValueArray();
+    }
+    return deletedNumber * getTsDataTypes().size();
+  }
+
+  // TODO: THIS METHOLD IS FOR DELETING ONE COLUMN OF A VECTOR
+  public int delete(long lowerBound, long upperBound, int columnIndex) {
+    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
+  }
+
   protected void set(int index, long timestamp, int value) {
     int arrayIndex = index / ARRAY_SIZE;
     int elementIndex = index % ARRAY_SIZE;
@@ -565,31 +602,19 @@ public class VectorTVList extends TVList {
 
   @Override
   public TimeValuePair getTimeValuePair(int index) {
-    if (this.dataTypes.size() == 1) {
-      return new TimeValuePair(getTime(index), ((TsPrimitiveType) getVector(index)).getVector()[0]);
-    } else {
-      return new TimeValuePair(getTime(index), (TsPrimitiveType) getVector(index));
-    }
+    return new TimeValuePair(getTime(index), (TsPrimitiveType) getVector(index));
   }
 
   @Override
   protected TimeValuePair getTimeValuePair(
       int index, long time, Integer floatPrecision, TSEncoding encoding) {
-    if (this.dataTypes.size() == 1) {
-      return new TimeValuePair(time, ((TsPrimitiveType) getVector(index)).getVector()[0]);
-    } else {
-      return new TimeValuePair(time, (TsPrimitiveType) getVector(index));
-    }
+    return new TimeValuePair(time, (TsPrimitiveType) getVector(index));
   }
 
   @Override
   public TimeValuePair getTimeValuePairForTimeDuplicatedRows(
       List<Integer> indexList, long time, Integer floatPrecision, TSEncoding encoding) {
-    if (this.dataTypes.size() == 1) {
-      return new TimeValuePair(time, getVector(indexList).getVector()[0]);
-    } else {
-      return new TimeValuePair(time, getVector(indexList));
-    }
+    return new TimeValuePair(time, getVector(indexList));
   }
 
   @Override
@@ -718,4 +743,85 @@ public class VectorTVList extends TVList {
   public TSDataType getDataType() {
     return TSDataType.VECTOR;
   }
+
+  /**
+   * Get the single vectorTVList array size by give types.
+   *
+   * @param types the types in the vector
+   * @return VectorTvListArrayMemSize
+   */
+  public static long vectorTvListArrayMemSize(TSDataType[] types) {
+    long size = 0;
+    // time size
+    size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
+    // index size
+    size += (long) PrimitiveArrayManager.ARRAY_SIZE * 4L;
+    // value size
+    for (TSDataType type : types) {
+      size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
+    }
+    return size;
+  }
+
+  @Override
+  @TestOnly
+  public IPointReader getIterator() {
+    return new VectorIte();
+  }
+
+  @Override
+  public IPointReader getIterator(
+      int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
+    return new VectorIte(floatPrecision, encoding, size, deletionList);
+  }
+
+  private class VectorIte extends Ite {
+
+    public VectorIte() {
+      super();
+    }
+
+    public VectorIte(
+        int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
+      super(floatPrecision, encoding, size, deletionList);
+    }
+
+    @Override
+    public boolean hasNextTimeValuePair() {
+      if (hasCachedPair) {
+        return true;
+      }
+
+      List<Integer> timeDuplicatedVectorRowIndexList = null;
+      while (cur < iteSize) {
+        long time = getTime(cur);
+        if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
+          if (timeDuplicatedVectorRowIndexList == null) {
+            timeDuplicatedVectorRowIndexList = new ArrayList<>();
+            timeDuplicatedVectorRowIndexList.add(getValueIndex(cur));
+          }
+          timeDuplicatedVectorRowIndexList.add(getValueIndex(cur + 1));
+          cur++;
+          continue;
+        }
+        TimeValuePair tvPair;
+        if (timeDuplicatedVectorRowIndexList != null) {
+          tvPair =
+              getTimeValuePairForTimeDuplicatedRows(
+                  timeDuplicatedVectorRowIndexList, time, floatPrecision, encoding);
+          timeDuplicatedVectorRowIndexList = null;
+        } else {
+          tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
+        }
+        cur++;
+        if (tvPair.getValue() != null) {
+          cachedTimeValuePair = tvPair;
+          hasCachedPair = true;
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
 }