You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/17 08:37:14 UTC

[iotdb] branch memtable_sort_in_query created (now 56bc73c)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 56bc73c  improve tv list

This branch includes the following new commits:

     new 56bc73c  improve tv list

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: improve tv list

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56bc73cae3e4b0160c8ae3ed54cc2b0af7b4f3c6
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Nov 17 16:36:40 2020 +0800

    improve tv list
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |   2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  55 +++++----
 .../db/engine/memtable/IWritableMemChunk.java      |  26 +++-
 .../iotdb/db/engine/memtable/WritableMemChunk.java |  32 ++++-
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |  21 +++-
 .../iotdb/db/utils/datastructure/TVList.java       | 132 ++++++++++++---------
 .../db/engine/memtable/PrimitiveMemTableTest.java  |   2 +-
 7 files changed, 171 insertions(+), 99 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..98a79ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -89,7 +89,7 @@ public class MemTableFlushTask {
         long startTime = System.currentTimeMillis();
         IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
         MeasurementSchema desc = series.getSchema();
-        TVList tvList = series.getSortedTVList();
+        TVList tvList = series.getSortedTVListForFlush();
         sortTime += System.currentTimeMillis() - startTime;
         encodingTaskQueue.add(new Pair<>(tvList, desc));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5960d43..026b343 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -46,31 +46,23 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public abstract class AbstractMemTable implements IMemTable {
 
   private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
-
+  /**
+   * The initial value is true because we want calculate the text data size when recover memTable!!
+   */
+  protected boolean disableMemControl = true;
   private long version = Long.MAX_VALUE;
-
   private List<Modification> modifications = new ArrayList<>();
-
   private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
       .getAvgSeriesPointNumberThreshold();
-
   /**
    * memory size of data points, including TEXT values
    */
   private long memSize = 0;
-
   /**
    * memory usage of all TVLists memory usage regardless of whether these TVLists are full,
    * including TEXT values
    */
   private long tvListRamCost = 0;
-
-  /**
-   * The initial value is true because we want calculate the text data size when recover
-   * memTable!!
-   */
-  protected boolean disableMemControl = true;
-
   private int seriesNumber = 0;
 
   private long totalPointsNum = 0;
@@ -129,14 +121,16 @@ public abstract class AbstractMemTable implements IMemTable {
       }
 
       Object value = insertRowPlan.getValues()[i];
-      memSize += MemUtils.getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
-          disableMemControl);
+      memSize += MemUtils
+          .getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
+              disableMemControl);
 
       write(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getMeasurements()[i],
           insertRowPlan.getMeasurementMNodes()[i].getSchema(), insertRowPlan.getTime(), value);
     }
 
-    totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
+    totalPointsNum +=
+        insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
   }
 
   @Override
@@ -146,8 +140,9 @@ public abstract class AbstractMemTable implements IMemTable {
     try {
       write(insertTabletPlan, start, end);
       memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl);
-      totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber())
-        * (end - start);
+      totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan
+          .getFailedMeasurementNumber())
+          * (end - start);
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
@@ -168,8 +163,10 @@ public abstract class AbstractMemTable implements IMemTable {
       if (insertTabletPlan.getColumns()[i] == null) {
         continue;
       }
-      IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId().getFullPath(),
-          insertTabletPlan.getMeasurements()[i], insertTabletPlan.getMeasurementMNodes()[i].getSchema());
+      IWritableMemChunk memSeries = createIfNotExistAndGet(
+          insertTabletPlan.getDeviceId().getFullPath(),
+          insertTabletPlan.getMeasurements()[i],
+          insertTabletPlan.getMeasurementMNodes()[i].getSchema());
       memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i],
           insertTabletPlan.getDataTypes()[i], start, end);
     }
@@ -248,11 +245,20 @@ public abstract class AbstractMemTable implements IMemTable {
       return null;
     }
     List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound);
-    IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
-    TVList chunkCopy = memChunk.getTVList().clone();
 
-    chunkCopy.setDeletionList(deletionList);
-    return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion());
+    TVList chunkCopy = null;
+    int curSize = 0;
+    // synchronize memtable map to get and sort
+    // when next query come, it will find the data has been sorted and get reference of the data
+    synchronized (memTableMap) {
+      IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+      chunkCopy = memChunk.getSortedTVListForQuery();
+      chunkCopy.increaseReferenceCount();
+      curSize = chunkCopy.size();
+    }
+
+    return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion(),
+        curSize, deletionList);
   }
 
   private List<TimeRange> constructDeletionList(String deviceId, String measurement,
@@ -273,7 +279,8 @@ public abstract class AbstractMemTable implements IMemTable {
   }
 
   @Override
-  public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
+  public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp,
+      long endTimestamp) {
     Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
     if (deviceMap == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 9dc19fd..f733864 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -63,12 +63,30 @@ public interface IWritableMemChunk {
 
   /**
    * served for query requests.
+   * <p>
+   * if tv list has been sorted, just return reference of it
+   * <p>
+   * if tv list hasn't been sorted and has no reference, sort and return reference of it
+   * <p>
+   * if tv list hasn't been sorted and has reference we should copy and sort it, then return ths
+   * list
+   * <p>
+   * the mechanism is just like copy on write
    *
-   * @return
+   * @return sorted tv list
    */
-  default TVList getSortedTVList() {
-    return null;
-  }
+  TVList getSortedTVListForQuery();
+
+  /**
+   * served for flush requests.
+   * <p>
+   * if tv list has reference, copy it. Then sort it
+   * <p>
+   * the mechanism is just like copy on write
+   *
+   * @return sorted tv list
+   */
+  TVList getSortedTVListForFlush();
 
   default TVList getTVList() {
     return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3d981d0..7bc76ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -153,8 +153,30 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public synchronized TVList getSortedTVList() {
-    list.sort();
+  public synchronized TVList getSortedTVListForQuery() {
+    // check reference count
+    if (list.getReferenceCount() > 0 && !list.isSorted()) {
+      list = list.clone();
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+
+    return list;
+  }
+
+  @Override
+  public TVList getSortedTVListForFlush() {
+    // check reference count
+    if (list.getReferenceCount() > 0) {
+      list = list.clone();
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+
     return list;
   }
 
@@ -185,13 +207,13 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public String toString() {
-    int size = getSortedTVList().size();
+    int size = getSortedTVListForQuery().size();
     StringBuilder out = new StringBuilder("MemChunk Size: " + size + System.lineSeparator());
     if (size != 0) {
       out.append("Data type:").append(schema.getType()).append(System.lineSeparator());
-      out.append("First point:").append(getSortedTVList().getTimeValuePair(0))
+      out.append("First point:").append(getSortedTVListForQuery().getTimeValuePair(0))
           .append(System.lineSeparator());
-      out.append("Last point:").append(getSortedTVList().getTimeValuePair(size - 1))
+      out.append("Last point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1))
           .append(System.lineSeparator());
       ;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 5be7104..d3f2c20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.querycontext;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader;
@@ -30,10 +31,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 public class ReadOnlyMemChunk {
 
+  // deletion list for this chunk
+  private final List<TimeRange> deletionList;
+
   private String measurementUid;
   private TSDataType dataType;
   private TSEncoding encoding;
@@ -48,8 +53,11 @@ public class ReadOnlyMemChunk {
 
   private IPointReader chunkPointReader;
 
+  private int chunkDataSize;
+
   public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TSEncoding encoding,
-      TVList tvList, Map<String, String> props, long version)
+      TVList tvList, Map<String, String> props, long version, int size,
+      List<TimeRange> deletionList)
       throws IOException, QueryProcessException {
     this.measurementUid = measurementUid;
     this.dataType = dataType;
@@ -58,9 +66,12 @@ public class ReadOnlyMemChunk {
     if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) {
       this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
     }
-    tvList.sort();
+
     this.chunkData = tvList;
-    this.chunkPointReader = tvList.getIterator(floatPrecision, encoding);
+    this.chunkDataSize = size;
+    this.deletionList = deletionList;
+
+    this.chunkPointReader = tvList.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
     initChunkMeta();
   }
 
@@ -68,7 +79,7 @@ public class ReadOnlyMemChunk {
     Statistics statsByType = Statistics.getStatsByType(dataType);
     ChunkMetadata metaData = new ChunkMetadata(measurementUid, dataType, 0, statsByType);
     if (!isEmpty()) {
-      IPointReader iterator = chunkData.getIterator(floatPrecision, encoding);
+      IPointReader iterator = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
       while (iterator.hasNextTimeValuePair()) {
         TimeValuePair timeValuePair = iterator.nextTimeValuePair();
         switch (dataType) {
@@ -114,7 +125,7 @@ public class ReadOnlyMemChunk {
   }
 
   public IPointReader getPointReader() {
-    chunkPointReader = chunkData.getIterator(floatPrecision, encoding);
+    chunkPointReader = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
     return chunkPointReader;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 13b4766..5d98727 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -35,31 +37,69 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 public abstract class TVList {
 
-  private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
-
   protected static final int SMALL_ARRAY_LENGTH = 32;
-
+  private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
   protected List<long[]> timestamps;
   protected int size;
 
   protected long[][] sortedTimestamps;
   protected boolean sorted = true;
-
-  /**
-   * this field is effective only in the Tvlist in a RealOnlyMemChunk.
-   */
-  private List<TimeRange> deletionList;
-  private long version;
-
+  // record reference count of this tv list
+  // currently this reference will only be increase because we can't know when to decrease it
+  protected AtomicInteger referenceCount;
   protected long pivotTime;
-
   protected long minTime;
 
+  private long version;
 
   public TVList() {
     timestamps = new ArrayList<>();
     size = 0;
     minTime = Long.MAX_VALUE;
+    referenceCount = new AtomicInteger();
+  }
+
+  public static TVList newList(TSDataType dataType) {
+    switch (dataType) {
+      case TEXT:
+        return new BinaryTVList();
+      case FLOAT:
+        return new FloatTVList();
+      case INT32:
+        return new IntTVList();
+      case INT64:
+        return new LongTVList();
+      case DOUBLE:
+        return new DoubleTVList();
+      case BOOLEAN:
+        return new BooleanTVList();
+      default:
+        break;
+    }
+    return null;
+  }
+
+  public static long tvListArrayMemSize(TSDataType type) {
+    long size = 0;
+    // time size
+    size +=
+        PrimitiveArrayManager.ARRAY_SIZE * 8;
+    // value size
+    size +=
+        PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
+    return size;
+  }
+
+  public boolean isSorted() {
+    return sorted;
+  }
+
+  public void increaseReferenceCount() {
+    referenceCount.incrementAndGet();
+  }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
   }
 
   public int size() {
@@ -222,9 +262,6 @@ public abstract class TVList {
 
     clearValue();
     clearSortedValue();
-    if (deletionList != null) {
-      deletionList.clear();
-    }
   }
 
   protected void clearTime() {
@@ -245,8 +282,8 @@ public abstract class TVList {
   abstract void clearValue();
 
   /**
-   * The arrays for sorting are not including in write memory now, 
-   * the memory usage is considered as temporary memory.
+   * The arrays for sorting are not including in write memory now, the memory usage is considered as
+   * temporary memory.
    */
   abstract void clearSortedValue();
 
@@ -271,6 +308,7 @@ public abstract class TVList {
     if (sorted) {
       return;
     }
+
     if (lo == hi) {
       return;
     }
@@ -307,33 +345,6 @@ public abstract class TVList {
     return runHi - lo;
   }
 
-  public static TVList newList(TSDataType dataType) {
-    switch (dataType) {
-      case TEXT:
-        return new BinaryTVList();
-      case FLOAT:
-        return new FloatTVList();
-      case INT32:
-        return new IntTVList();
-      case INT64:
-        return new LongTVList();
-      case DOUBLE:
-        return new DoubleTVList();
-      case BOOLEAN:
-        return new BooleanTVList();
-      default:
-        break;
-    }
-    return null;
-  }
-
-  /**
-   * this field is effective only in the Tvlist in a RealOnlyMemChunk.
-   */
-  public void setDeletionList(List<TimeRange> list) {
-    this.deletionList = list;
-  }
-
   protected int compare(int idx1, int idx2) {
     long t1 = getTime(idx1);
     long t2 = getTime(idx2);
@@ -469,23 +480,14 @@ public abstract class TVList {
   protected abstract TimeValuePair getTimeValuePair(int index, long time,
       Integer floatPrecision, TSEncoding encoding);
 
+  @TestOnly
   public IPointReader getIterator() {
     return new Ite();
   }
 
-  public IPointReader getIterator(int floatPrecision, TSEncoding encoding) {
-    return new Ite(floatPrecision, encoding);
-  }
-
-  public static long tvListArrayMemSize(TSDataType type) {
-    long size = 0;
-    // time size
-    size +=
-        PrimitiveArrayManager.ARRAY_SIZE * 8;
-    // value size
-    size +=
-        PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
-    return size;
+  public IPointReader getIterator(int floatPrecision, TSEncoding encoding, int size,
+      List<TimeRange> deletionList) {
+    return new Ite(floatPrecision, encoding, size, deletionList);
   }
 
   private class Ite implements IPointReader {
@@ -496,13 +498,24 @@ public abstract class TVList {
     private Integer floatPrecision;
     private TSEncoding encoding;
     private int deleteCursor = 0;
+    /**
+     * because TV list may be share with different query, each iterator has to record it's own size
+     */
+    private int iteSize = 0;
+    /**
+     * this field is effective only in the Tvlist in a RealOnlyMemChunk.
+     */
+    private List<TimeRange> deletionList;
 
     public Ite() {
+      this.iteSize = TVList.this.size;
     }
 
-    public Ite(int floatPrecision, TSEncoding encoding) {
+    public Ite(int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
       this.floatPrecision = floatPrecision;
       this.encoding = encoding;
+      this.iteSize = size;
+      this.deletionList = deletionList;
     }
 
     @Override
@@ -511,7 +524,7 @@ public abstract class TVList {
         return true;
       }
 
-      while (cur < size) {
+      while (cur < iteSize) {
         long time = getTime(cur);
         if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
           cur++;
@@ -522,7 +535,8 @@ public abstract class TVList {
         cur++;
         return true;
       }
-      return hasCachedPair;
+
+      return false;
     }
 
     private boolean isPointDeleted(long timestamp) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 94c6378..b967fe1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -58,7 +58,7 @@ public class PrimitiveMemTableTest {
     for (int i = 0; i < count; i++) {
       series.write(i, i);
     }
-    IPointReader it = series.getSortedTVList().getIterator();
+    IPointReader it = series.getSortedTVListForQuery().getIterator();
     int i = 0;
     while (it.hasNextTimeValuePair()) {
       Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());