You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/17 08:37:15 UTC

[iotdb] 01/01: improve tv list

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56bc73cae3e4b0160c8ae3ed54cc2b0af7b4f3c6
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Nov 17 16:36:40 2020 +0800

    improve tv list
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  55 +++++----
 .../db/engine/memtable/IWritableMemChunk.java      |  26 +++-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  32 ++++-
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |  21 +++-
 .../iotdb/db/utils/datastructure/TVList.java       | 132 ++++++++++++---------
 .../db/engine/memtable/PrimitiveMemTableTest.java  |   2 +-
 7 files changed, 171 insertions(+), 99 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..98a79ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -89,7 +89,7 @@ public class MemTableFlushTask {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = series.getSchema();
-        TVList tvList = series.getSortedTVList();
+        TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5960d43..026b343 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -46,31 +46,23 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public abstract class AbstractMemTable implements IMemTable {
 
   private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
-
+  /**
+   * The initial value is true because we want calculate the text data size when recover memTable!!
+   */
+  protected boolean disableMemControl = true;
   private long version = Long.MAX_VALUE;
-
   private List<Modification> modifications = new ArrayList<>();
-
   private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
       .getAvgSeriesPointNumberThreshold();
-
   /**
    * memory size of data points, including TEXT values
    */
   private long memSize = 0;
-
   /**
    * memory usage of all TVLists memory usage regardless of whether these TVLists are full,
    * including TEXT values
    */
   private long tvListRamCost = 0;
-
-  /**
-   * The initial value is true because we want calculate the text data size when recover
-   * memTable!!
-   */
-  protected boolean disableMemControl = true;
-
   private int seriesNumber = 0;
 
   private long totalPointsNum = 0;
@@ -129,14 +121,16 @@ public abstract class AbstractMemTable implements IMemTable {
       }
 
       Object value = insertRowPlan.getValues()[i];
-      memSize += MemUtils.getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
-          disableMemControl);
+      memSize += MemUtils
+          .getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
+              disableMemControl);
 
       write(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getMeasurements()[i],
           insertRowPlan.getMeasurementMNodes()[i].getSchema(), insertRowPlan.getTime(), value);
     }
 
-    totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
+    totalPointsNum +=
+        insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
   }
 
   @Override
@@ -146,8 +140,9 @@ public abstract class AbstractMemTable implements IMemTable {
     try {
       write(insertTabletPlan, start, end);
       memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl);
-      totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber())
-        * (end - start);
+      totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan
+          .getFailedMeasurementNumber())
+          * (end - start);
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
@@ -168,8 +163,10 @@ public abstract class AbstractMemTable implements IMemTable {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
-      IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId().getFullPath(),
-          insertTabletPlan.getMeasurements()[i], insertTabletPlan.getMeasurementMNodes()[i].getSchema());
+      IWritableMemChunk memSeries = createIfNotExistAndGet(
+          insertTabletPlan.getDeviceId().getFullPath(),
+          insertTabletPlan.getMeasurements()[i],
+          insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i],
           insertTabletPlan.getDataTypes()[i], start, end);
     }
@@ -248,11 +245,20 @@ public abstract class AbstractMemTable implements IMemTable {
       return null;
     }
     List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound);
-    IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
-    TVList chunkCopy = memChunk.getTVList().clone();
 
-    chunkCopy.setDeletionList(deletionList);
-    return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion());
+    TVList chunkCopy = null;
+    int curSize = 0;
+    // synchronize memtable map to get and sort
+    // when next query come, it will find the data has been sorted and get reference of the data
+    synchronized (memTableMap) {
+      IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+      chunkCopy = memChunk.getSortedTVListForQuery();
+      chunkCopy.increaseReferenceCount();
+      curSize = chunkCopy.size();
+    }
+
+    return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion(),
+        curSize, deletionList);
   }
 
   private List<TimeRange> constructDeletionList(String deviceId, String measurement,
@@ -273,7 +279,8 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   @Override
-  public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
+  public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp,
+      long endTimestamp) {
     Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
     if (deviceMap == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 9dc19fd..f733864 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -63,12 +63,30 @@ public interface IWritableMemChunk {
 
   /**
    * served for query requests.
+   * <p>
+   * if tv list has been sorted, just return reference of it
+   * <p>
+   * if tv list hasn't been sorted and has no reference, sort and return reference of it
+   * <p>
+   * if tv list hasn't been sorted and has reference we should copy and sort it, then return ths
+   * list
+   * <p>
+   * the mechanism is just like copy on write
    *
-   * @return
+   * @return sorted tv list
    */
-  default TVList getSortedTVList() {
-    return null;
-  }
+  TVList getSortedTVListForQuery();
+
+  /**
+   * served for flush requests.
+   * <p>
+   * if tv list has reference, copy it. Then sort it
+   * <p>
+   * the mechanism is just like copy on write
+   *
+   * @return sorted tv list
+   */
+  TVList getSortedTVListForFlush();
 
   default TVList getTVList() {
     return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3d981d0..7bc76ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -153,8 +153,30 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public synchronized TVList getSortedTVList() {
-    list.sort();
+  public synchronized TVList getSortedTVListForQuery() {
+    // check reference count
+    if (list.getReferenceCount() > 0 && !list.isSorted()) {
+      list = list.clone();
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+
+    return list;
+  }
+
+  @Override
+  public TVList getSortedTVListForFlush() {
+    // check reference count
+    if (list.getReferenceCount() > 0) {
+      list = list.clone();
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+
     return list;
   }
 
@@ -185,13 +207,13 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public String toString() {
-    int size = getSortedTVList().size();
+    int size = getSortedTVListForQuery().size();
     StringBuilder out = new StringBuilder("MemChunk Size: " + size + System.lineSeparator());
     if (size != 0) {
       out.append("Data type:").append(schema.getType()).append(System.lineSeparator());
-      out.append("First point:").append(getSortedTVList().getTimeValuePair(0))
+      out.append("First point:").append(getSortedTVListForQuery().getTimeValuePair(0))
           .append(System.lineSeparator());
-      out.append("Last point:").append(getSortedTVList().getTimeValuePair(size - 1))
+      out.append("Last point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1))
           .append(System.lineSeparator());
       ;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 5be7104..d3f2c20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.querycontext;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader;
@@ -30,10 +31,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 public class ReadOnlyMemChunk {
 
+  // deletion list for this chunk
+  private final List<TimeRange> deletionList;
+
   private String measurementUid;
   private TSDataType dataType;
   private TSEncoding encoding;
@@ -48,8 +53,11 @@ public class ReadOnlyMemChunk {
 
   private IPointReader chunkPointReader;
 
+  private int chunkDataSize;
+
   public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TSEncoding encoding,
-      TVList tvList, Map<String, String> props, long version)
+      TVList tvList, Map<String, String> props, long version, int size,
+      List<TimeRange> deletionList)
       throws IOException, QueryProcessException {
     this.measurementUid = measurementUid;
     this.dataType = dataType;
@@ -58,9 +66,12 @@ public class ReadOnlyMemChunk {
     if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) {
       this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
     }
-    tvList.sort();
+
     this.chunkData = tvList;
-    this.chunkPointReader = tvList.getIterator(floatPrecision, encoding);
+    this.chunkDataSize = size;
+    this.deletionList = deletionList;
+
+    this.chunkPointReader = tvList.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
     initChunkMeta();
   }
 
@@ -68,7 +79,7 @@ public class ReadOnlyMemChunk {
     Statistics statsByType = Statistics.getStatsByType(dataType);
     ChunkMetadata metaData = new ChunkMetadata(measurementUid, dataType, 0, statsByType);
     if (!isEmpty()) {
-      IPointReader iterator = chunkData.getIterator(floatPrecision, encoding);
+      IPointReader iterator = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
       while (iterator.hasNextTimeValuePair()) {
         TimeValuePair timeValuePair = iterator.nextTimeValuePair();
         switch (dataType) {
@@ -114,7 +125,7 @@ public class ReadOnlyMemChunk {
   }
 
   public IPointReader getPointReader() {
-    chunkPointReader = chunkData.getIterator(floatPrecision, encoding);
+    chunkPointReader = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
     return chunkPointReader;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 13b4766..5d98727 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -35,31 +37,69 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class TVList {
 
-  private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
-
   protected static final int SMALL_ARRAY_LENGTH = 32;
-
+  private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
   protected List<long[]> timestamps;
   protected int size;
 
   protected long[][] sortedTimestamps;
   protected boolean sorted = true;
-
-  /**
-   * this field is effective only in the Tvlist in a RealOnlyMemChunk.
-   */
-  private List<TimeRange> deletionList;
-  private long version;
-
+  // record reference count of this tv list
+  // currently this reference will only be increase because we can't know when to decrease it
+  protected AtomicInteger referenceCount;
   protected long pivotTime;
-
   protected long minTime;
 
+  private long version;
 
   public TVList() {
     timestamps = new ArrayList<>();
     size = 0;
     minTime = Long.MAX_VALUE;
+    referenceCount = new AtomicInteger();
+  }
+
+  public static TVList newList(TSDataType dataType) {
+    switch (dataType) {
+      case TEXT:
+        return new BinaryTVList();
+      case FLOAT:
+        return new FloatTVList();
+      case INT32:
+        return new IntTVList();
+      case INT64:
+        return new LongTVList();
+      case DOUBLE:
+        return new DoubleTVList();
+      case BOOLEAN:
+        return new BooleanTVList();
+      default:
+        break;
+    }
+    return null;
+  }
+
+  public static long tvListArrayMemSize(TSDataType type) {
+    long size = 0;
+    // time size
+    size +=
+        PrimitiveArrayManager.ARRAY_SIZE * 8;
+    // value size
+    size +=
+        PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
+    return size;
+  }
+
+  public boolean isSorted() {
+    return sorted;
+  }
+
+  public void increaseReferenceCount() {
+    referenceCount.incrementAndGet();
+  }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
   }
 
   public int size() {
@@ -222,9 +262,6 @@ public abstract class TVList {
 
     clearValue();
     clearSortedValue();
-    if (deletionList != null) {
-      deletionList.clear();
-    }
   }
 
   protected void clearTime() {
@@ -245,8 +282,8 @@ public abstract class TVList {
   abstract void clearValue();
 
   /**
-   * The arrays for sorting are not including in write memory now, 
-   * the memory usage is considered as temporary memory.
+   * The arrays for sorting are not including in write memory now, the memory usage is considered as
+   * temporary memory.
    */
   abstract void clearSortedValue();
 
@@ -271,6 +308,7 @@ public abstract class TVList {
     if (sorted) {
       return;
     }
+
     if (lo == hi) {
       return;
     }
@@ -307,33 +345,6 @@ public abstract class TVList {
     return runHi - lo;
   }
 
-  public static TVList newList(TSDataType dataType) {
-    switch (dataType) {
-      case TEXT:
-        return new BinaryTVList();
-      case FLOAT:
-        return new FloatTVList();
-      case INT32:
-        return new IntTVList();
-      case INT64:
-        return new LongTVList();
-      case DOUBLE:
-        return new DoubleTVList();
-      case BOOLEAN:
-        return new BooleanTVList();
-      default:
-        break;
-    }
-    return null;
-  }
-
-  /**
-   * this field is effective only in the Tvlist in a RealOnlyMemChunk.
-   */
-  public void setDeletionList(List<TimeRange> list) {
-    this.deletionList = list;
-  }
-
   protected int compare(int idx1, int idx2) {
     long t1 = getTime(idx1);
     long t2 = getTime(idx2);
@@ -469,23 +480,14 @@ public abstract class TVList {
   protected abstract TimeValuePair getTimeValuePair(int index, long time,
       Integer floatPrecision, TSEncoding encoding);
 
+  @TestOnly
   public IPointReader getIterator() {
     return new Ite();
   }
 
-  public IPointReader getIterator(int floatPrecision, TSEncoding encoding) {
-    return new Ite(floatPrecision, encoding);
-  }
-
-  public static long tvListArrayMemSize(TSDataType type) {
-    long size = 0;
-    // time size
-    size +=
-        PrimitiveArrayManager.ARRAY_SIZE * 8;
-    // value size
-    size +=
-        PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
-    return size;
+  public IPointReader getIterator(int floatPrecision, TSEncoding encoding, int size,
+      List<TimeRange> deletionList) {
+    return new Ite(floatPrecision, encoding, size, deletionList);
   }
 
   private class Ite implements IPointReader {
@@ -496,13 +498,24 @@ public abstract class TVList {
     private Integer floatPrecision;
     private TSEncoding encoding;
     private int deleteCursor = 0;
+    /**
+     * because TV list may be share with different query, each iterator has to record it's own size
+     */
+    private int iteSize = 0;
+    /**
+     * this field is effective only in the Tvlist in a RealOnlyMemChunk.
+     */
+    private List<TimeRange> deletionList;
 
     public Ite() {
+      this.iteSize = TVList.this.size;
     }
 
-    public Ite(int floatPrecision, TSEncoding encoding) {
+    public Ite(int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
       this.floatPrecision = floatPrecision;
       this.encoding = encoding;
+      this.iteSize = size;
+      this.deletionList = deletionList;
     }
 
     @Override
@@ -511,7 +524,7 @@ public abstract class TVList {
         return true;
       }
 
-      while (cur < size) {
+      while (cur < iteSize) {
         long time = getTime(cur);
         if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
           cur++;
@@ -522,7 +535,8 @@ public abstract class TVList {
         cur++;
         return true;
       }
-      return hasCachedPair;
+
+      return false;
     }
 
     private boolean isPointDeleted(long timestamp) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 94c6378..b967fe1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -58,7 +58,7 @@ public class PrimitiveMemTableTest {
     for (int i = 0; i < count; i++) {
       series.write(i, i);
     }
-    IPointReader it = series.getSortedTVList().getIterator();
+    IPointReader it = series.getSortedTVListForQuery().getIterator();
     int i = 0;
     while (it.hasNextTimeValuePair()) {
       Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());