You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/11/17 08:37:15 UTC
[iotdb] 01/01: improve tv list
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch memtable_sort_in_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56bc73cae3e4b0160c8ae3ed54cc2b0af7b4f3c6
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Tue Nov 17 16:36:40 2020 +0800
improve tv list
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 55 +++++----
.../db/engine/memtable/IWritableMemChunk.java | 26 +++-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 32 ++++-
.../db/engine/querycontext/ReadOnlyMemChunk.java | 21 +++-
.../iotdb/db/utils/datastructure/TVList.java | 132 ++++++++++++---------
.../db/engine/memtable/PrimitiveMemTableTest.java | 2 +-
7 files changed, 171 insertions(+), 99 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index e9f4c92..98a79ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -89,7 +89,7 @@ public class MemTableFlushTask {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = series.getSchema();
- TVList tvList = series.getSortedTVList();
+ TVList tvList = series.getSortedTVListForFlush();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5960d43..026b343 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -46,31 +46,23 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public abstract class AbstractMemTable implements IMemTable {
private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
-
+ /**
+ * The initial value is true because we want calculate the text data size when recover memTable!!
+ */
+ protected boolean disableMemControl = true;
private long version = Long.MAX_VALUE;
-
private List<Modification> modifications = new ArrayList<>();
-
private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
.getAvgSeriesPointNumberThreshold();
-
/**
* memory size of data points, including TEXT values
*/
private long memSize = 0;
-
/**
* memory usage of all TVLists memory usage regardless of whether these TVLists are full,
* including TEXT values
*/
private long tvListRamCost = 0;
-
- /**
- * The initial value is true because we want calculate the text data size when recover
- * memTable!!
- */
- protected boolean disableMemControl = true;
-
private int seriesNumber = 0;
private long totalPointsNum = 0;
@@ -129,14 +121,16 @@ public abstract class AbstractMemTable implements IMemTable {
}
Object value = insertRowPlan.getValues()[i];
- memSize += MemUtils.getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
- disableMemControl);
+ memSize += MemUtils
+ .getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value,
+ disableMemControl);
write(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getMeasurements()[i],
insertRowPlan.getMeasurementMNodes()[i].getSchema(), insertRowPlan.getTime(), value);
}
- totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
+ totalPointsNum +=
+ insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
}
@Override
@@ -146,8 +140,9 @@ public abstract class AbstractMemTable implements IMemTable {
try {
write(insertTabletPlan, start, end);
memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl);
- totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber())
- * (end - start);
+ totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan
+ .getFailedMeasurementNumber())
+ * (end - start);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
@@ -168,8 +163,10 @@ public abstract class AbstractMemTable implements IMemTable {
if (insertTabletPlan.getColumns()[i] == null) {
continue;
}
- IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId().getFullPath(),
- insertTabletPlan.getMeasurements()[i], insertTabletPlan.getMeasurementMNodes()[i].getSchema());
+ IWritableMemChunk memSeries = createIfNotExistAndGet(
+ insertTabletPlan.getDeviceId().getFullPath(),
+ insertTabletPlan.getMeasurements()[i],
+ insertTabletPlan.getMeasurementMNodes()[i].getSchema());
memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i],
insertTabletPlan.getDataTypes()[i], start, end);
}
@@ -248,11 +245,20 @@ public abstract class AbstractMemTable implements IMemTable {
return null;
}
List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound);
- IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
- TVList chunkCopy = memChunk.getTVList().clone();
- chunkCopy.setDeletionList(deletionList);
- return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion());
+ TVList chunkCopy = null;
+ int curSize = 0;
+ // synchronize memtable map to get and sort
+ // when next query come, it will find the data has been sorted and get reference of the data
+ synchronized (memTableMap) {
+ IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
+ chunkCopy = memChunk.getSortedTVListForQuery();
+ chunkCopy.increaseReferenceCount();
+ curSize = chunkCopy.size();
+ }
+
+ return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion(),
+ curSize, deletionList);
}
private List<TimeRange> constructDeletionList(String deviceId, String measurement,
@@ -273,7 +279,8 @@ public abstract class AbstractMemTable implements IMemTable {
}
@Override
- public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
+ public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp,
+ long endTimestamp) {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
if (deviceMap == null) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 9dc19fd..f733864 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -63,12 +63,30 @@ public interface IWritableMemChunk {
/**
* served for query requests.
+ * <p>
+ * if tv list has been sorted, just return reference of it
+ * <p>
+ * if tv list hasn't been sorted and has no reference, sort and return reference of it
+ * <p>
+ * if tv list hasn't been sorted and has reference we should copy and sort it, then return ths
+ * list
+ * <p>
+ * the mechanism is just like copy on write
*
- * @return
+ * @return sorted tv list
*/
- default TVList getSortedTVList() {
- return null;
- }
+ TVList getSortedTVListForQuery();
+
+ /**
+ * served for flush requests.
+ * <p>
+ * if tv list has reference, copy it. Then sort it
+ * <p>
+ * the mechanism is just like copy on write
+ *
+ * @return sorted tv list
+ */
+ TVList getSortedTVListForFlush();
default TVList getTVList() {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 3d981d0..7bc76ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -153,8 +153,30 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public synchronized TVList getSortedTVList() {
- list.sort();
+ public synchronized TVList getSortedTVListForQuery() {
+ // check reference count
+ if (list.getReferenceCount() > 0 && !list.isSorted()) {
+ list = list.clone();
+ }
+
+ if (!list.isSorted()) {
+ list.sort();
+ }
+
+ return list;
+ }
+
+ @Override
+ public TVList getSortedTVListForFlush() {
+ // check reference count
+ if (list.getReferenceCount() > 0) {
+ list = list.clone();
+ }
+
+ if (!list.isSorted()) {
+ list.sort();
+ }
+
return list;
}
@@ -185,13 +207,13 @@ public class WritableMemChunk implements IWritableMemChunk {
@Override
public String toString() {
- int size = getSortedTVList().size();
+ int size = getSortedTVListForQuery().size();
StringBuilder out = new StringBuilder("MemChunk Size: " + size + System.lineSeparator());
if (size != 0) {
out.append("Data type:").append(schema.getType()).append(System.lineSeparator());
- out.append("First point:").append(getSortedTVList().getTimeValuePair(0))
+ out.append("First point:").append(getSortedTVListForQuery().getTimeValuePair(0))
.append(System.lineSeparator());
- out.append("Last point:").append(getSortedTVList().getTimeValuePair(size - 1))
+ out.append("Last point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1))
.append(System.lineSeparator());
;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index 5be7104..d3f2c20 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.querycontext;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader;
@@ -30,10 +31,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
public class ReadOnlyMemChunk {
+ // deletion list for this chunk
+ private final List<TimeRange> deletionList;
+
private String measurementUid;
private TSDataType dataType;
private TSEncoding encoding;
@@ -48,8 +53,11 @@ public class ReadOnlyMemChunk {
private IPointReader chunkPointReader;
+ private int chunkDataSize;
+
public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TSEncoding encoding,
- TVList tvList, Map<String, String> props, long version)
+ TVList tvList, Map<String, String> props, long version, int size,
+ List<TimeRange> deletionList)
throws IOException, QueryProcessException {
this.measurementUid = measurementUid;
this.dataType = dataType;
@@ -58,9 +66,12 @@ public class ReadOnlyMemChunk {
if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) {
this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
}
- tvList.sort();
+
this.chunkData = tvList;
- this.chunkPointReader = tvList.getIterator(floatPrecision, encoding);
+ this.chunkDataSize = size;
+ this.deletionList = deletionList;
+
+ this.chunkPointReader = tvList.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
initChunkMeta();
}
@@ -68,7 +79,7 @@ public class ReadOnlyMemChunk {
Statistics statsByType = Statistics.getStatsByType(dataType);
ChunkMetadata metaData = new ChunkMetadata(measurementUid, dataType, 0, statsByType);
if (!isEmpty()) {
- IPointReader iterator = chunkData.getIterator(floatPrecision, encoding);
+ IPointReader iterator = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
switch (dataType) {
@@ -114,7 +125,7 @@ public class ReadOnlyMemChunk {
}
public IPointReader getPointReader() {
- chunkPointReader = chunkData.getIterator(floatPrecision, encoding);
+ chunkPointReader = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList);
return chunkPointReader;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 13b4766..5d98727 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -25,7 +25,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -35,31 +37,69 @@ import org.apache.iotdb.tsfile.utils.Binary;
public abstract class TVList {
- private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
-
protected static final int SMALL_ARRAY_LENGTH = 32;
-
+ private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
protected List<long[]> timestamps;
protected int size;
protected long[][] sortedTimestamps;
protected boolean sorted = true;
-
- /**
- * this field is effective only in the Tvlist in a RealOnlyMemChunk.
- */
- private List<TimeRange> deletionList;
- private long version;
-
+ // record reference count of this tv list
+ // currently this reference will only be increase because we can't know when to decrease it
+ protected AtomicInteger referenceCount;
protected long pivotTime;
-
protected long minTime;
+ private long version;
public TVList() {
timestamps = new ArrayList<>();
size = 0;
minTime = Long.MAX_VALUE;
+ referenceCount = new AtomicInteger();
+ }
+
+ public static TVList newList(TSDataType dataType) {
+ switch (dataType) {
+ case TEXT:
+ return new BinaryTVList();
+ case FLOAT:
+ return new FloatTVList();
+ case INT32:
+ return new IntTVList();
+ case INT64:
+ return new LongTVList();
+ case DOUBLE:
+ return new DoubleTVList();
+ case BOOLEAN:
+ return new BooleanTVList();
+ default:
+ break;
+ }
+ return null;
+ }
+
+ public static long tvListArrayMemSize(TSDataType type) {
+ long size = 0;
+ // time size
+ size +=
+ PrimitiveArrayManager.ARRAY_SIZE * 8;
+ // value size
+ size +=
+ PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
+ return size;
+ }
+
+ public boolean isSorted() {
+ return sorted;
+ }
+
+ public void increaseReferenceCount() {
+ referenceCount.incrementAndGet();
+ }
+
+ public int getReferenceCount() {
+ return referenceCount.get();
}
public int size() {
@@ -222,9 +262,6 @@ public abstract class TVList {
clearValue();
clearSortedValue();
- if (deletionList != null) {
- deletionList.clear();
- }
}
protected void clearTime() {
@@ -245,8 +282,8 @@ public abstract class TVList {
abstract void clearValue();
/**
- * The arrays for sorting are not including in write memory now,
- * the memory usage is considered as temporary memory.
+ * The arrays for sorting are not including in write memory now, the memory usage is considered as
+ * temporary memory.
*/
abstract void clearSortedValue();
@@ -271,6 +308,7 @@ public abstract class TVList {
if (sorted) {
return;
}
+
if (lo == hi) {
return;
}
@@ -307,33 +345,6 @@ public abstract class TVList {
return runHi - lo;
}
- public static TVList newList(TSDataType dataType) {
- switch (dataType) {
- case TEXT:
- return new BinaryTVList();
- case FLOAT:
- return new FloatTVList();
- case INT32:
- return new IntTVList();
- case INT64:
- return new LongTVList();
- case DOUBLE:
- return new DoubleTVList();
- case BOOLEAN:
- return new BooleanTVList();
- default:
- break;
- }
- return null;
- }
-
- /**
- * this field is effective only in the Tvlist in a RealOnlyMemChunk.
- */
- public void setDeletionList(List<TimeRange> list) {
- this.deletionList = list;
- }
-
protected int compare(int idx1, int idx2) {
long t1 = getTime(idx1);
long t2 = getTime(idx2);
@@ -469,23 +480,14 @@ public abstract class TVList {
protected abstract TimeValuePair getTimeValuePair(int index, long time,
Integer floatPrecision, TSEncoding encoding);
+ @TestOnly
public IPointReader getIterator() {
return new Ite();
}
- public IPointReader getIterator(int floatPrecision, TSEncoding encoding) {
- return new Ite(floatPrecision, encoding);
- }
-
- public static long tvListArrayMemSize(TSDataType type) {
- long size = 0;
- // time size
- size +=
- PrimitiveArrayManager.ARRAY_SIZE * 8;
- // value size
- size +=
- PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize();
- return size;
+ public IPointReader getIterator(int floatPrecision, TSEncoding encoding, int size,
+ List<TimeRange> deletionList) {
+ return new Ite(floatPrecision, encoding, size, deletionList);
}
private class Ite implements IPointReader {
@@ -496,13 +498,24 @@ public abstract class TVList {
private Integer floatPrecision;
private TSEncoding encoding;
private int deleteCursor = 0;
+ /**
+ * because TV list may be share with different query, each iterator has to record it's own size
+ */
+ private int iteSize = 0;
+ /**
+ * this field is effective only in the Tvlist in a RealOnlyMemChunk.
+ */
+ private List<TimeRange> deletionList;
public Ite() {
+ this.iteSize = TVList.this.size;
}
- public Ite(int floatPrecision, TSEncoding encoding) {
+ public Ite(int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) {
this.floatPrecision = floatPrecision;
this.encoding = encoding;
+ this.iteSize = size;
+ this.deletionList = deletionList;
}
@Override
@@ -511,7 +524,7 @@ public abstract class TVList {
return true;
}
- while (cur < size) {
+ while (cur < iteSize) {
long time = getTime(cur);
if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
cur++;
@@ -522,7 +535,8 @@ public abstract class TVList {
cur++;
return true;
}
- return hasCachedPair;
+
+ return false;
}
private boolean isPointDeleted(long timestamp) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 94c6378..b967fe1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -58,7 +58,7 @@ public class PrimitiveMemTableTest {
for (int i = 0; i < count; i++) {
series.write(i, i);
}
- IPointReader it = series.getSortedTVList().getIterator();
+ IPointReader it = series.getSortedTVListForQuery().getIterator();
int i = 0;
while (it.hasNextTimeValuePair()) {
Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());