You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/28 03:57:18 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
TVListAllocator and reuse sorting buffer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 742cc55 add TVListAllocator and reuse sorting buffer
new 3e0e95c Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
742cc55 is described below
commit 742cc55bc5079826e51ba98273bb44d05f944f16
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jun 28 11:51:22 2019 +0800
add TVListAllocator and reuse sorting buffer
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 8 +++-
.../filenodeV2/UnsealedTsFileProcessorV2.java | 7 +++-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 24 +++++++++---
.../iotdb/db/engine/memtable/EmptyMemTable.java | 2 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 5 +++
.../db/engine/memtable/IWritableMemChunk.java | 2 +
.../db/engine/memtable/MemTableFlushTaskV2.java | 6 ++-
.../db/engine/memtable/PrimitiveMemTable.java | 4 +-
.../db/engine/memtable/WritableMemChunkV2.java | 9 ++++-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 13 ++++---
.../iotdb/db/utils/datastructure/LongTVList.java | 15 ++++----
.../iotdb/db/utils/datastructure/TVList.java | 2 +
.../datastructure/TVListAllocator.java} | 43 +++++++---------------
.../writelog/recover/TsFileRecoverPerformer.java | 2 +
.../filenodeV2/UnsealedTsFileProcessorV2Test.java | 9 +++--
.../engine/memtable/MemTableFlushTaskV2Test.java | 2 +
.../db/engine/memtable/MemtableBenchmark.java | 2 +
.../db/engine/memtable/PrimitiveMemTableTest.java | 4 ++
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +
19 files changed, 101 insertions(+), 60 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 61b077d..16d11ad 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -107,6 +109,8 @@ public class FileNodeProcessorV2 {
*/
private ModificationFile mergingModification;
+ private TVListAllocator allocator = new TVListAllocator();
+
public FileNodeProcessorV2(String baseDir, String storageGroupName) throws ProcessorException {
this.storageGroupName = storageGroupName;
@@ -343,11 +347,11 @@ public class FileNodeProcessorV2 {
if (sequence) {
return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback,
- this::updateLatestFlushTimeCallback);
+ this::updateLatestFlushTimeCallback, allocator);
} else {
return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback,
- () -> true);
+ () -> true, allocator);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index e708a44..80826d0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -85,6 +86,8 @@ public class UnsealedTsFileProcessorV2 {
private WriteLogNode logNode;
+ private TVListAllocator allocator;
+
/**
* sync this object in query() and asyncFlush()
*/
@@ -93,7 +96,7 @@ public class UnsealedTsFileProcessorV2 {
public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
VersionController versionController,
Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback,
- Supplier flushUpdateLatestFlushTimeCallback)
+ Supplier flushUpdateLatestFlushTimeCallback, TVListAllocator allocator)
throws IOException {
this.storageGroupName = storageGroupName;
this.fileSchema = fileSchema;
@@ -102,6 +105,7 @@ public class UnsealedTsFileProcessorV2 {
this.writer = new NativeRestorableIOWriter(tsfile);
this.closeUnsealedFileCallback = closeUnsealedFileCallback;
this.flushUpdateLatestFlushTimeCallback = flushUpdateLatestFlushTimeCallback;
+ this.allocator = allocator;
LOGGER.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
}
@@ -118,6 +122,7 @@ public class UnsealedTsFileProcessorV2 {
if (workMemTable == null) {
// TODO change the impl of getEmptyMemTable to non-blocking
workMemTable = MemTablePool.getInstance().getEmptyMemTable(this);
+ workMemTable.setTVListAllocator(allocator);
// no empty memtable, return failure
if (workMemTable == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index ac2c5ca..fe3a5f5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public abstract class AbstractMemTable implements IMemTable {
@@ -42,6 +43,8 @@ public abstract class AbstractMemTable implements IMemTable {
private long memSize = 0;
+ protected TVListAllocator allocator;
+
public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
@@ -71,12 +74,12 @@ public abstract class AbstractMemTable implements IMemTable {
}
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
if (!memSeries.containsKey(measurement)) {
- memSeries.put(measurement, genMemSeries(dataType));
+ memSeries.put(measurement, genMemSeries(dataType, deviceId + PATH_SEPARATOR + measurement));
}
return memSeries.get(measurement);
}
- protected abstract IWritableMemChunk genMemSeries(TSDataType dataType);
+ protected abstract IWritableMemChunk genMemSeries(TSDataType dataType, String path);
@Override
@@ -177,7 +180,7 @@ public abstract class AbstractMemTable implements IMemTable {
if (chunk == null) {
return true;
}
- IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
+ IWritableMemChunk newChunk = filterChunk(chunk, timestamp, deviceId + PATH_SEPARATOR + measurementId);
if (newChunk != null) {
deviceMap.put(measurementId, newChunk);
return newChunk.count() != chunk.count();
@@ -200,11 +203,11 @@ public abstract class AbstractMemTable implements IMemTable {
* @return A reduced copy of chunk if chunk contains data with timestamp less than 'timestamp', of
* null.
*/
- private IWritableMemChunk filterChunk(IWritableMemChunk chunk, long timestamp) {
+ private IWritableMemChunk filterChunk(IWritableMemChunk chunk, long timestamp, String path) {
List<TimeValuePair> timeValuePairs = chunk.getSortedTimeValuePairList();
if (!timeValuePairs.isEmpty() && timeValuePairs.get(0).getTimestamp() <= timestamp) {
TSDataType dataType = chunk.getType();
- IWritableMemChunk newChunk = genMemSeries(dataType);
+ IWritableMemChunk newChunk = genMemSeries(dataType, path);
for (TimeValuePair pair : timeValuePairs) {
if (pair.getTimestamp() > timestamp) {
switch (dataType) {
@@ -231,6 +234,7 @@ public abstract class AbstractMemTable implements IMemTable {
}
}
}
+ allocator.release(path, chunk.getTVList());
return newChunk;
}
return null;
@@ -249,4 +253,14 @@ public abstract class AbstractMemTable implements IMemTable {
public long getVersion() {
return version;
}
+
+ @Override
+ public void setTVListAllocator(TVListAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public TVListAllocator getTVListAllocator() {
+ return allocator;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
index 5b31b57..88d5945 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/EmptyMemTable.java
@@ -9,7 +9,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class EmptyMemTable extends AbstractMemTable {
@Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
+ protected IWritableMemChunk genMemSeries(TSDataType dataType, String path) {
return null;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index f6395f8..9f0a453 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
@@ -97,4 +98,8 @@ public interface IMemTable {
long getVersion();
void setVersion(long version);
+
+ void setTVListAllocator(TVListAllocator allocator);
+
+ TVListAllocator getTVListAllocator();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index cf8ccbb..f59ab3e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -65,4 +65,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
* @return
*/
default TVList getSortedTVList(){return null;}
+
+ default TVList getTVList(){return null;}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
index 0df3bc0..58cac01 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -107,6 +108,7 @@ public class MemTableFlushTaskV2 {
private Runnable EncodingTask = new Runnable() {
@Override
public void run() {
+ String currDevice = null;
try {
long memSerializeTime = 0;
boolean noMoreMessages = false;
@@ -129,6 +131,7 @@ public class MemTableFlushTaskV2 {
}
} else {
if (task instanceof String) {
+ currDevice = (String) task;
ioTaskQueue.add(task);
} else if (task instanceof ChunkGroupIoTask) {
ioTaskQueue.add(task);
@@ -141,6 +144,8 @@ public class MemTableFlushTaskV2 {
try {
writeOneSeries(encodingMessage.left, seriesWriter,
encodingMessage.right.getType());
+ memTable.getTVListAllocator().release(currDevice + IoTDBConstant.PATH_SEPARATOR
+ + encodingMessage.right.getMeasurementId(), encodingMessage.left);
ioTaskQueue.add(seriesWriter);
} catch (IOException e) {
LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup,
@@ -218,7 +223,6 @@ public class MemTableFlushTaskV2 {
private void writeOneSeries(TVList tvPairs, IChunkWriter seriesWriterImpl,
TSDataType dataType)
throws IOException {
-
for (int i = 0; i < tvPairs.size(); i++) {
long time = tvPairs.getTime(i);
if (time < tvPairs.getTimeOffset() ||
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 2901e3e..5f55b45 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -34,8 +34,8 @@ public class PrimitiveMemTable extends AbstractMemTable {
}
@Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
- return new WritableMemChunkV2(dataType);
+ protected IWritableMemChunk genMemSeries(TSDataType dataType, String path) {
+ return new WritableMemChunkV2(dataType, allocator.allocate(path, dataType));
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
index b303b6d..14a8335 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java
@@ -40,9 +40,9 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
private TSDataType dataType;
private TVList list;
- public WritableMemChunkV2(TSDataType dataType) {
+ public WritableMemChunkV2(TSDataType dataType, TVList list) {
this.dataType = dataType;
- this.list = TVList.newList(dataType);
+ this.list = list;
}
@Override
@@ -135,6 +135,11 @@ public class WritableMemChunkV2 implements IWritableMemChunk {
}
@Override
+ public TVList getTVList() {
+ return list;
+ }
+
+ @Override
public long count() {
return list.size();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index f60cf0e..a09e17b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -82,7 +82,7 @@ public class DoubleTVList extends TVList {
}
return cloneList;
}
-
+
private double[] cloneValue(double[] array) {
double[] cloneArray = new double[array.length];
System.arraycopy(array, 0, cloneArray, 0, array.length);
@@ -90,12 +90,14 @@ public class DoubleTVList extends TVList {
}
public void sort() {
- sortedTimestamps = new long[size];
- sortedValues = new double[size];
+ if (sortedTimestamps == null || sortedTimestamps.length < size) {
+ sortedTimestamps = new long[size];
+ }
+ if (sortedValues == null || sortedValues.length < size) {
+ sortedValues = new double[size];
+ }
sort(0, size);
sorted = true;
- values = null;
- timestamps = null;
}
@Override
@@ -141,4 +143,5 @@ public class DoubleTVList extends TVList {
protected void setPivotTo(int pos) {
set(pos, pivotTime, pivotValue);
}
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index b0741e4..6b149fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -89,17 +89,15 @@ public class LongTVList extends TVList {
return cloneArray;
}
- public void reset() {
- size = 0;
- }
-
public void sort() {
- sortedTimestamps = new long[size];
- sortedValues = new long[size];
+ if (sortedTimestamps == null || sortedTimestamps.length < size) {
+ sortedTimestamps = new long[size];
+ }
+ if (sortedValues == null || sortedValues.length < size) {
+ sortedValues = new long[size];
+ }
sort(0, size);
sorted = true;
- values = null;
- timestamps = null;
}
@Override
@@ -145,4 +143,5 @@ public class LongTVList extends TVList {
protected void setPivotTo(int pos) {
set(pos, pivotTime, pivotValue);
}
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index cb67aff..14af6e3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -142,6 +142,8 @@ public abstract class TVList {
public void reset() {
size = 0;
limit = 0;
+ timeOffset = -1;
+ sorted = false;
}
protected void checkExpansion() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
similarity index 55%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
copy to iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
index 2901e3e..3ddeffa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVListAllocator.java
@@ -17,42 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.memtable;
+package org.apache.iotdb.db.utils.datastructure;
-import java.util.HashMap;
import java.util.Map;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-public class PrimitiveMemTable extends AbstractMemTable {
-
- public PrimitiveMemTable() {
- }
-
- public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
- super(memTableMap);
- }
+public class TVListAllocator {
+ private Map<String, ConcurrentLinkedQueue<TVList>> tvListCache = new ConcurrentHashMap<>();
- @Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
- return new WritableMemChunkV2(dataType);
+ public TVList allocate(String identifier, TSDataType dataType) {
+ ConcurrentLinkedQueue<TVList> tvLists = tvListCache.computeIfAbsent(identifier,
+ k -> new ConcurrentLinkedQueue<>());
+ TVList list = tvLists.poll();
+ return list != null ? list : TVList.newList(dataType);
}
- @Override
- public IMemTable copy() {
- Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
-
- return new PrimitiveMemTable(newMap);
+ public void release(String identifier, TVList list) {
+ list.reset();
+ tvListCache.get(identifier).add(list);
}
-
- @Override
- public boolean isManagedByMemPool() {
- return true;
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj;
- }
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 11f4a1d..fe418ee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -70,6 +71,7 @@ public class TsFileRecoverPerformer {
*/
public void recover() throws ProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
+ recoverMemTable.setTVListAllocator(new TVListAllocator());
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
tsFileResource, fileSchema, recoverMemTable, acceptUnseq);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 24b6d6c..bdd53dc 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FileSchemaUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -71,7 +72,8 @@ public class UnsealedTsFileProcessorV2Test {
public void testWriteAndFlush()
throws WriteProcessException, IOException, UnsealedTsFileProcessorException {
processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
- FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()-> true);
+ FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{},
+ ()-> true, new TVListAllocator());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
.query(deviceId, measurementId, dataType, props);
@@ -116,7 +118,8 @@ public class UnsealedTsFileProcessorV2Test {
public void testMultiFlush()
throws WriteProcessException, IOException, UnsealedTsFileProcessorException {
processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
- FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()->true);
+ FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{},
+ ()->true, new TVListAllocator());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
.query(deviceId, measurementId, dataType, props);
@@ -159,7 +162,7 @@ public class UnsealedTsFileProcessorV2Test {
}
resource.close();
}
- }, ()->true);
+ }, ()->true, new TVListAllocator());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
.query(deviceId, measurementId, dataType, props);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
index 2566fa3..e552a42 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2Test.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
import org.junit.After;
@@ -46,6 +47,7 @@ public class MemTableFlushTaskV2Test {
writer = new NativeRestorableIOWriter(new File(filePath));
memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
}
@After
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
index cf5dfa3..b97a05c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/MemtableBenchmark.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/**
@@ -40,6 +41,7 @@ public class MemtableBenchmark {
public static void main(String[] args) {
IMemTable memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
final long startTime = System.currentTimeMillis();
// cpu not locality
for (int i = 0; i < numOfPoint; i++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 01f43bb..12cce43 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -62,6 +63,7 @@ public class PrimitiveMemTableTest {
@Test
public void simpleTest() {
IMemTable memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
int count = 10;
String deviceId = "d1";
String measurementId[] = new String[count];
@@ -129,6 +131,7 @@ public class PrimitiveMemTableTest {
@Test
public void testFloatType() {
IMemTable memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
String deviceId = "d1";
int size = 1000000;
write(memTable, deviceId, "s1", TSDataType.FLOAT, size);
@@ -137,6 +140,7 @@ public class PrimitiveMemTableTest {
@Test
public void testAllType() {
IMemTable memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
int count = 10;
String deviceId = "d1";
String measurementId[] = new String[count];
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 9c58c41..aa45f2e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.datastructure.TVListAllocator;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -70,6 +71,7 @@ public class LogReplayerTest {
};
TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsFile);
IMemTable memTable = new PrimitiveMemTable();
+ memTable.setTVListAllocator(new TVListAllocator());
FileSchema schema = new FileSchema();
try {