You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/10 08:04:09 UTC
[iotdb] branch master updated: [IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 487b3edc16 [IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)
487b3edc16 is described below
commit 487b3edc16be602e0648225e7ee07845c23cd7c0
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Nov 10 16:04:02 2022 +0800
[IOTDB-4784] Control total memory for enabling time partition for storage engine (#7792)
---
.../resources/conf/iotdb-common.properties | 11 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 30 ++-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 9 +
.../engine/memtable/AlignedWritableMemChunk.java | 10 +
.../memtable/AlignedWritableMemChunkGroup.java | 5 +
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 +
.../db/engine/memtable/IWritableMemChunk.java | 6 +-
.../db/engine/memtable/IWritableMemChunkGroup.java | 2 +
.../iotdb/db/engine/memtable/WritableMemChunk.java | 9 +-
.../db/engine/memtable/WritableMemChunkGroup.java | 25 ++-
.../iotdb/db/engine/storagegroup/DataRegion.java | 240 ++++++++++++---------
.../db/engine/storagegroup/DataRegionInfo.java | 2 +-
...hTimeManager.java => HashLastFlushTimeMap.java} | 235 ++++++++++----------
...meManager.java => IDTableLastFlushTimeMap.java} | 179 +++++++--------
...lushTimeManager.java => ILastFlushTimeMap.java} | 34 +--
.../db/engine/storagegroup/TimePartitionInfo.java | 66 ++++++
.../engine/storagegroup/TimePartitionManager.java | 165 ++++++++++++++
.../db/engine/storagegroup/TsFileManager.java | 6 +
.../db/engine/storagegroup/TsFileProcessor.java | 9 +-
.../db/engine/storagegroup/TsFileResource.java | 6 +
.../db/metadata/idtable/entry/DeviceEntry.java | 37 +---
.../org/apache/iotdb/db/rescon/SystemInfo.java | 31 +--
.../db/utils/datastructure/AlignedTVList.java | 4 +-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 17 +-
.../db/utils/datastructure/BooleanTVList.java | 13 +-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 13 +-
.../iotdb/db/utils/datastructure/FloatTVList.java | 13 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 13 +-
.../iotdb/db/utils/datastructure/LongTVList.java | 13 +-
.../iotdb/db/utils/datastructure/TVList.java | 20 +-
.../db/engine/storagegroup/DataRegionTest.java | 2 +-
.../storagegroup/IDTableLastFlushTimeMapTest.java | 129 +++++++++++
.../engine/storagegroup/LastFlushTimeMapTest.java | 223 +++++++++++++++++++
.../iotdb/db/engine/storagegroup/TTLTest.java | 5 +-
.../storagegroup/TimePartitionManagerTest.java | 129 +++++++++++
.../engine/storagegroup/TsFileProcessorV2Test.java | 12 +-
.../db/metadata/idtable/IDTableFlushTimeTest.java | 199 -----------------
38 files changed, 1273 insertions(+), 675 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 8122732058..772de865b2 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -130,20 +130,25 @@
# Datatype: boolean
# enable_mem_control=true
-# Memory Allocation Ratio: Write, Read, Schema, Consensus and Free Memory.
+# Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine, Consensus and Free Memory.
# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 6:2:1:1:1
# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:1:2
-# write_read_schema_free_memory_proportion=3:3:1:1:2
+# storage_query_schema_consensus_free_memory_proportion=3:3:1:1:2
# Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache and LastCache.
# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1
# In cluster mode, we recommend 5:3:1:1. In standalone mode, we recommend 8:1:0:1
# schema_memory_allocate_proportion=5:3:1:1
-# Memory allocation ratio in StorageEngine: MemTable, Compaction
+# Memory allocation ratio in StorageEngine: Write, Compaction
# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 8:2 , 7:3
# storage_engine_memory_proportion=8:2
+# Memory allocation ratio in writing: Memtable, TimePartitionInfo
+# Memtable is the total memory size of all memtables
+# TimePartitionInfo is the total memory size of last flush time of all data regions
+# write_memory_proportion=19:1
+
# Max number of concurrent writing time partitions in one storage group
# This parameter is used to control total memTable number when memory control is disabled
# The max number of memTable is 4 * concurrent_writing_time_partition * storage group number
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 882caaf97f..04696f99bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -150,15 +150,15 @@ public class IoTDBConfig {
private double rejectProportion = 0.8;
/** The proportion of write memory for memtable */
- private double writeProportion = 0.8;
+ private double writeProportionForMemtable = 0.8;
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;
/**
* If memory cost of data region increased more than proportion of {@linkplain
- * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain IoTDBConfig#getWriteProportion()},
- * report to system.
+ * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
+ * IoTDBConfig#getWriteProportionForMemtable()}, report to system.
*/
private double writeMemoryVariationReportProportion = 0.001;
@@ -501,6 +501,8 @@ public class IoTDBConfig {
/** Memory allocated proportion for timeIndex */
private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
+ /** Memory allocated proportion for time partition info */
+ private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
/**
* If true, we will estimate each query's possible memory footprint before executing it and deny
* it if its estimated memory exceeds current free memory
@@ -2108,6 +2110,14 @@ public class IoTDBConfig {
this.allocateMemoryForTimeIndex = allocateMemoryForTimeIndex;
}
+ public long getAllocateMemoryForTimePartitionInfo() {
+ return allocateMemoryForTimePartitionInfo;
+ }
+
+ public void setAllocateMemoryForTimePartitionInfo(long allocateMemoryForTimePartitionInfo) {
+ this.allocateMemoryForTimePartitionInfo = allocateMemoryForTimePartitionInfo;
+ }
+
public boolean isEnableQueryMemoryEstimation() {
return enableQueryMemoryEstimation;
}
@@ -3269,12 +3279,12 @@ public class IoTDBConfig {
this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
}
- public double getWriteProportion() {
- return writeProportion;
+ public double getWriteProportionForMemtable() {
+ return writeProportionForMemtable;
}
- public void setWriteProportion(double writeProportion) {
- this.writeProportion = writeProportion;
+ public void setWriteProportionForMemtable(double writeProportionForMemtable) {
+ this.writeProportionForMemtable = writeProportionForMemtable;
}
public double getCompactionProportion() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ef945bb55b..9222ce9621 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1591,7 +1591,7 @@ public class IoTDBDescriptor {
private void initMemoryAllocate(Properties properties) {
String memoryAllocateProportion =
- properties.getProperty("write_read_schema_free_memory_proportion");
+ properties.getProperty("storage_query_schema_consensus_free_memory_proportion");
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
@@ -1685,14 +1685,34 @@ public class IoTDBDescriptor {
private void initStorageEngineAllocate(Properties properties) {
String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2");
String[] proportions = allocationRatio.split(":");
- int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+ int proportionForWrite = Integer.parseInt(proportions[0].trim());
int proportionForCompaction = Integer.parseInt(proportions[1].trim());
- conf.setWriteProportion(
+
+ double writeProportion =
+ ((double) (proportionForWrite) / (double) (proportionForCompaction + proportionForWrite));
+
+ String allocationRatioForWrite = properties.getProperty("write_memory_proportion", "19:1");
+ proportions = allocationRatioForWrite.split(":");
+ int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+ int proportionForTimePartitionInfo = Integer.parseInt(proportions[1].trim());
+
+ double memtableProportionForWrite =
((double) (proportionForMemTable)
- / (double) (proportionForCompaction + proportionForMemTable)));
+ / (double) (proportionForMemTable + proportionForTimePartitionInfo));
+ Double.parseDouble(properties.getProperty("flush_time_memory_proportion", "0.05"));
+ double timePartitionInfoForWrite =
+ ((double) (proportionForTimePartitionInfo)
+ / (double) (proportionForMemTable + proportionForTimePartitionInfo));
+ conf.setWriteProportionForMemtable(writeProportion * memtableProportionForWrite);
+
+ conf.setAllocateMemoryForTimePartitionInfo(
+ (long)
+ ((writeProportion * timePartitionInfoForWrite)
+ * conf.getAllocateMemoryForStorageEngine()));
+
conf.setCompactionProportion(
((double) (proportionForCompaction)
- / (double) (proportionForCompaction + proportionForMemTable)));
+ / (double) (proportionForCompaction + proportionForWrite)));
}
private void initSchemaMemoryAllocate(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index e34b79a173..a7790b7764 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -631,6 +631,15 @@ public abstract class AbstractMemTable implements IMemTable {
}
}
+ @Override
+ public Map<String, Long> getMaxTime() {
+ Map<String, Long> latestTimeForEachDevice = new HashMap<>();
+ for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
+ latestTimeForEachDevice.put(entry.getKey().toStringID(), entry.getValue().getMaxTime());
+ }
+ return latestTimeForEachDevice;
+ }
+
public static class Factory {
private Factory() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index 845fc8c4b3..dabd3b472d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -237,6 +237,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
return null;
}
+ @Override
+ public long getMaxTime() {
+ return list.getMaxTime();
+ }
+
@Override
public synchronized TVList getSortedTvListForQuery() {
sortTVList();
@@ -445,6 +450,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk {
.getTimestamp();
}
+ @Override
+ public boolean isEmpty() {
+ return list.rowCount() == 0;
+ }
+
@Override
public int serializedSize() {
int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 25856adf27..878f3b691e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -135,6 +135,11 @@ public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup {
return memChunk.getTVList().rowCount();
}
+ @Override
+ public long getMaxTime() {
+ return memChunk.getMaxTime();
+ }
+
public AlignedWritableMemChunk getAlignedMemChunk() {
return memChunk;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 9101efdf7c..293d1276b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -169,4 +169,6 @@ public interface IMemTable extends WALEntryValue {
FlushStatus getFlushStatus();
void setFlushStatus(FlushStatus flushStatus);
+
+ Map<String, Long> getMaxTime();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 5237e53535..6eec2dc559 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -124,8 +124,8 @@ public interface IWritableMemChunk extends WALEntryValue {
return null;
}
- default long getMinTime() {
- return Long.MIN_VALUE;
+ default long getMaxTime() {
+ return Long.MAX_VALUE;
}
/** @return how many points are deleted */
@@ -140,4 +140,6 @@ public interface IWritableMemChunk extends WALEntryValue {
long getFirstPoint();
long getLastPoint();
+
+ boolean isEmpty();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
index e3d089f5a7..3ef362cdaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -52,4 +52,6 @@ public interface IWritableMemChunkGroup extends WALEntryValue {
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp);
long getCurrentTVListSize(String measurement);
+
+ long getMaxTime();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index c8bd6144f4..11b6f55d4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -246,8 +246,8 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public long getMinTime() {
- return list.getMinTime();
+ public long getMaxTime() {
+ return list.getMaxTime();
}
@Override
@@ -268,6 +268,11 @@ public class WritableMemChunk implements IWritableMemChunk {
.getTimestamp();
}
+ @Override
+ public boolean isEmpty() {
+ return list.rowCount() == 0;
+ }
+
@Override
public int delete(long lowerBound, long upperBound) {
return list.delete(lowerBound, upperBound);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 128a3d6403..e8f2d12e3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -129,16 +129,28 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
IWritableMemChunk chunk = entry.getValue();
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
iter.remove();
+ deletedPointsNumber += chunk.count();
+ chunk.release();
+ } else {
+ deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+ if (chunk.count() == 0) {
+ iter.remove();
+ }
}
- deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
}
} else {
IWritableMemChunk chunk = memChunkMap.get(targetMeasurement);
if (chunk != null) {
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
memChunkMap.remove(targetMeasurement);
+ deletedPointsNumber += chunk.count();
+ chunk.release();
+ } else {
+ deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
+ if (chunk.count() == 0) {
+ memChunkMap.remove(targetMeasurement);
+ }
}
- deletedPointsNumber += chunk.delete(startTimestamp, endTimestamp);
}
}
@@ -150,6 +162,15 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup {
return memChunkMap.get(measurement).getTVList().rowCount();
}
+ @Override
+ public long getMaxTime() {
+ long maxTime = Long.MIN_VALUE;
+ for (IWritableMemChunk memChunk : memChunkMap.values()) {
+ maxTime = Math.max(maxTime, memChunk.getMaxTime());
+ }
+ return maxTime;
+ }
+
@Override
public int serializedSize() {
int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index aef9ff8ac5..75b26d34ff 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -254,7 +255,7 @@ public class DataRegion {
/** flush listeners */
private List<FlushListener> customFlushListeners = Collections.emptyList();
- private ILastFlushTimeManager lastFlushTimeManager;
+ private ILastFlushTimeMap lastFlushTimeMap;
/**
* record the insertWriteLock in SG is being hold by which method, it will be empty string if on
@@ -300,9 +301,9 @@ public class DataRegion {
// if use id table, we use id table flush time manager
if (config.isEnableIDTable()) {
idTable = IDTableManager.getInstance().getIDTableDirectly(storageGroupName);
- lastFlushTimeManager = new IDTableFlushTimeManager(idTable);
+ lastFlushTimeMap = new IDTableLastFlushTimeMap(idTable, tsFileManager);
} else {
- lastFlushTimeManager = new LastFlushTimeManager();
+ lastFlushTimeMap = new HashLastFlushTimeMap(tsFileManager);
}
// recover tsfiles unless consensus protocol is ratis and storage engine is not ready
@@ -364,7 +365,7 @@ public class DataRegion {
private Map<Long, List<TsFileResource>> splitResourcesByPartition(
List<TsFileResource> resources) {
- Map<Long, List<TsFileResource>> ret = new HashMap<>();
+ Map<Long, List<TsFileResource>> ret = new TreeMap<>();
for (TsFileResource resource : resources) {
ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
}
@@ -487,15 +488,25 @@ public class DataRegion {
}
WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
// recover sealed TsFiles
- for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
- for (TsFileResource tsFileResource : value) {
- recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, true);
+ if (!partitionTmpSeqTsFiles.isEmpty()) {
+ long latestPartitionId =
+ ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
+ for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ DataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ true,
+ partitionFiles.getKey() == latestPartitionId);
}
}
- for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
- for (TsFileResource tsFileResource : value) {
- recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, false);
- }
+ for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) {
+ recoverFilesInPartition(
+ partitionFiles.getKey(),
+ DataRegionRecoveryContext,
+ partitionFiles.getValue(),
+ false,
+ false);
}
// wait until all unsealed TsFiles have been recovered
for (WALRecoverListener recoverListener : recoverListeners) {
@@ -549,9 +560,8 @@ public class DataRegion {
long endTime = resource.getEndTime(deviceId);
endTimeMap.put(deviceId.intern(), endTime);
}
- lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap);
- lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
- lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap);
+ lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
+ lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
}
}
@@ -601,13 +611,12 @@ public class DataRegion {
for (String deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
long endTimePartitionId = StorageEngineV2.getTimePartition(endTime);
- lastFlushTimeManager.setOneDeviceLastTime(endTimePartitionId, deviceId, endTime);
- lastFlushTimeManager.setOneDeviceGlobalFlushedTime(deviceId, endTime);
+ lastFlushTimeMap.setOneDeviceGlobalFlushedTime(deviceId, endTime);
// set all the covered partition's LatestFlushedTime
long partitionId = StorageEngineV2.getTimePartition(resource.getStartTime(deviceId));
while (partitionId <= endTimePartitionId) {
- lastFlushTimeManager.setOneDeviceFlushedTime(partitionId, deviceId, endTime);
+ lastFlushTimeMap.setOneDeviceFlushedTime(partitionId, deviceId, endTime);
if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
File directory =
SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
@@ -754,13 +763,16 @@ public class DataRegion {
// the last file is not closed, continue writing to it
RestorableTsFileIOWriter writer = recoverPerformer.getWriter();
long timePartitionId = tsFileResource.getTimePartition();
+ TimePartitionManager.getInstance()
+ .updateAfterOpeningTsFileProcessor(
+ new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
TsFileProcessor tsFileProcessor =
new TsFileProcessor(
dataRegionId,
dataRegionInfo,
tsFileResource,
this::closeUnsealedTsFileProcessorCallBack,
- isSeq ? this::updateLatestFlushTimeCallback : this::unsequenceFlushCallback,
+ isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback,
isSeq,
writer);
if (isSeq) {
@@ -787,7 +799,6 @@ public class DataRegion {
}
tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
}
- updateLastFlushTime(tsFileResource, isSeq);
}
tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
}
@@ -811,7 +822,6 @@ public class DataRegion {
}
sealedTsFile.close();
tsFileManager.add(sealedTsFile, isSeq);
- updateLastFlushTime(sealedTsFile, isSeq);
tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
} catch (DataRegionException | IOException e) {
logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
@@ -821,6 +831,32 @@ public class DataRegion {
}
}
+ private void recoverFilesInPartition(
+ long partitionId,
+ DataRegionRecoveryContext context,
+ List<TsFileResource> resourceList,
+ boolean isSeq,
+ boolean isLatestPartition) {
+ for (TsFileResource tsFileResource : resourceList) {
+ recoverSealedTsFiles(tsFileResource, context, isSeq);
+ }
+ if (isLatestPartition && isSeq) {
+ lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
+ for (TsFileResource tsFileResource : resourceList) {
+ updateLastFlushTime(tsFileResource, true);
+ }
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ partitionId,
+ false,
+ Long.MAX_VALUE,
+ lastFlushTimeMap.getMemSize(partitionId),
+ true));
+ }
+ }
+
// ({systemTime}-{versionNum}-{mergeNum}.tsfile)
private int compareFileName(File o1, File o2) {
String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
@@ -854,11 +890,21 @@ public class DataRegion {
// init map
long timePartitionId = StorageEngineV2.getTimePartition(insertRowNode.getTime());
- lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ timePartitionId,
+ true,
+ Long.MAX_VALUE,
+ 0,
+ tsFileManager.isLatestTimePartition(timePartitionId)));
+ }
boolean isSequence =
insertRowNode.getTime()
- > lastFlushTimeManager.getFlushedTime(
+ > lastFlushTimeMap.getFlushedTime(
timePartitionId, insertRowNode.getDevicePath().getFullPath());
// is unsequence and user set config to discard out of order data
@@ -867,8 +913,6 @@ public class DataRegion {
return;
}
- lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
-
// fire trigger before insertion
// TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, insertRowNode);
// insert to sequence or unSequence file
@@ -932,9 +976,23 @@ public class DataRegion {
long beforeTimePartition =
StorageEngineV2.getTimePartition(insertTabletNode.getTimes()[before]);
// init map
+
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ beforeTimePartition,
+ true,
+ Long.MAX_VALUE,
+ 0,
+ tsFileManager.isLatestTimePartition(beforeTimePartition)));
+ }
+
long lastFlushTime =
- lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
- beforeTimePartition, insertTabletNode.getDevicePath().getFullPath(), Long.MIN_VALUE);
+ lastFlushTimeMap.getFlushedTime(
+ beforeTimePartition, insertTabletNode.getDevicePath().getFullPath());
+
// if is sequence
boolean isSequence = false;
while (loc < insertTabletNode.getRowCount()) {
@@ -965,7 +1023,7 @@ public class DataRegion {
&& noFailure;
}
long globalLatestFlushedTime =
- lastFlushTimeManager.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
+ lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
if (!noFailure) {
@@ -1034,15 +1092,6 @@ public class DataRegion {
return false;
}
- lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
- // try to update the latest time of the device of this tsRecord
- if (sequence) {
- lastFlushTimeManager.updateLastTime(
- timePartitionId,
- insertTabletNode.getDevicePath().getFullPath(),
- insertTabletNode.getTimes()[end - 1]);
- }
-
// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
@@ -1078,12 +1127,8 @@ public class DataRegion {
tsFileProcessor.insert(insertRowNode);
- // try to update the latest time of the device of this tsRecord
- lastFlushTimeManager.updateLastTime(
- timePartitionId, insertRowNode.getDevicePath().getFullPath(), insertRowNode.getTime());
-
long globalLatestFlushTime =
- lastFlushTimeManager.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
+ lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
@@ -1206,6 +1251,9 @@ public class DataRegion {
if (null == res) {
// build new processor, memory control module will control the number of memtables
+ TimePartitionManager.getInstance()
+ .updateAfterOpeningTsFileProcessor(
+ new DataRegionId(Integer.valueOf(dataRegionId)), timeRangeId);
res = newTsFileProcessor(sequence, timeRangeId);
tsFileProcessorTreeMap.put(timeRangeId, res);
tsFileManager.add(res.getTsFileResource(), sequence);
@@ -1243,7 +1291,7 @@ public class DataRegion {
fsFactory.getFileWithParent(filePath),
dataRegionInfo,
this::closeUnsealedTsFileProcessorCallBack,
- this::updateLatestFlushTimeCallback,
+ this::sequenceFlushCallback,
true);
} else {
tsFileProcessor =
@@ -1335,7 +1383,6 @@ public class DataRegion {
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
- updateEndTimeMap(tsFileProcessor);
tsFileProcessor.asyncClose();
workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
@@ -1414,9 +1461,8 @@ public class DataRegion {
this.workSequenceTsFileProcessors.clear();
this.workUnsequenceTsFileProcessors.clear();
this.tsFileManager.clear();
- lastFlushTimeManager.clearFlushedTime();
- lastFlushTimeManager.clearGlobalFlushedTime();
- lastFlushTimeManager.clearLastTime();
+ lastFlushTimeMap.clearFlushedTime();
+ lastFlushTimeMap.clearGlobalFlushedTime();
} finally {
writeUnlock();
}
@@ -2029,61 +2075,44 @@ public class DataRegion {
}
}
- /**
- * when close an TsFileProcessor, update its EndTimeMap immediately
- *
- * @param tsFileProcessor processor to be closed
- */
- private void updateEndTimeMap(TsFileProcessor tsFileProcessor) {
- TsFileResource resource = tsFileProcessor.getTsFileResource();
- for (String deviceId : resource.getDevices()) {
- resource.updateEndTime(
- deviceId, lastFlushTimeManager.getLastTime(tsFileProcessor.getTimeRangeId(), deviceId));
- }
- }
-
- private boolean unsequenceFlushCallback(TsFileProcessor processor) {
+ private boolean unsequenceFlushCallback(
+ TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ processor.getTimeRangeId(),
+ systemFlushTime,
+ lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+ workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
return true;
}
- private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
- boolean res = lastFlushTimeManager.updateLatestFlushTime(processor.getTimeRangeId());
+ private boolean sequenceFlushCallback(
+ TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
+ boolean res = lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
if (!res) {
logger.warn(
"Partition: {} does't have latest time for each device. "
+ "No valid record is written into memtable. Flushing tsfile is: {}",
processor.getTimeRangeId(),
processor.getTsFileResource().getTsFile());
+ return res;
}
- return res;
- }
-
- /**
- * update latest flush time for partition id
- *
- * @param partitionId partition id
- * @param latestFlushTime lastest flush time
- * @return true if update latest flush time success
- */
- private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
- boolean res =
- lastFlushTimeManager.updateLatestFlushTimeToPartition(partitionId, latestFlushTime);
- if (!res) {
- logger.warn(
- "Partition: {} does't have latest time for each device. "
- + "No valid record is written into memtable. latest flush time is: {}",
- partitionId,
- latestFlushTime);
- }
-
+ TimePartitionManager.getInstance()
+ .updateAfterFlushing(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ processor.getTimeRangeId(),
+ systemFlushTime,
+ lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
+ workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
return res;
}
/** used for upgrading */
public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
long partitionId, String deviceId, long time) {
- lastFlushTimeManager.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ lastFlushTimeMap.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
partitionId, deviceId, time);
}
@@ -2163,7 +2192,7 @@ public class DataRegion {
writeUnlock();
}
// after upgrade complete, update partitionLatestFlushedTimeForEachDevice
- lastFlushTimeManager.applyNewlyFlushedTimeToFlushedTime();
+ lastFlushTimeMap.applyNewlyFlushedTimeToFlushedTime();
}
}
@@ -2579,9 +2608,8 @@ public class DataRegion {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = StorageEngineV2.getTimePartition(endTime);
- lastFlushTimeManager.updateLastTime(timePartitionId, device, endTime);
- lastFlushTimeManager.updateFlushedTime(timePartitionId, device, endTime);
- lastFlushTimeManager.updateGlobalFlushedTime(device, endTime);
+ lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
+ lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
}
}
@@ -3006,13 +3034,15 @@ public class DataRegion {
iterator.hasNext(); ) {
Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
long partitionId = longTsFileProcessorEntry.getKey();
+ lastFlushTimeMap.removePartition(partitionId);
+ TimePartitionManager.getInstance()
+ .removePartition(new DataRegionId(Integer.valueOf(dataRegionId)), partitionId);
TsFileProcessor processor = longTsFileProcessorEntry.getValue();
if (filter.satisfy(storageGroupName, partitionId)) {
processor.syncClose();
iterator.remove();
processor.getTsFileResource().remove();
tsFileManager.remove(processor.getTsFileResource(), sequence);
- updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
logger.debug(
"{} is removed during deleting partitions",
processor.getTsFileResource().getTsFilePath());
@@ -3028,7 +3058,10 @@ public class DataRegion {
if (filter.satisfy(storageGroupName, tsFileResource.getTimePartition())) {
tsFileResource.remove();
tsFileManager.remove(tsFileResource, sequence);
- updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), Long.MIN_VALUE);
+ lastFlushTimeMap.removePartition(tsFileResource.getTimePartition());
+ TimePartitionManager.getInstance()
+ .removePartition(
+ new DataRegionId(Integer.valueOf(dataRegionId)), tsFileResource.getTimePartition());
logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath());
}
}
@@ -3072,14 +3105,25 @@ public class DataRegion {
// init map
long timePartitionId = StorageEngineV2.getTimePartition(insertRowNode.getTime());
- lastFlushTimeManager.ensureFlushedTimePartition(timePartitionId);
+ if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.valueOf(dataRegionId)),
+ timePartitionId,
+ true,
+ Long.MAX_VALUE,
+ 0,
+ tsFileManager.isLatestTimePartition(timePartitionId)));
+ }
+
// as the plans have been ordered, and we have get the write lock,
// So, if a plan is sequenced, then all the rest plans are sequenced.
//
if (!isSequence) {
isSequence =
insertRowNode.getTime()
- > lastFlushTimeManager.getFlushedTime(
+ > lastFlushTimeMap.getFlushedTime(
timePartitionId, insertRowNode.getDevicePath().getFullPath());
}
// is unsequence and user set config to discard out of order data
@@ -3088,8 +3132,6 @@ public class DataRegion {
return;
}
- lastFlushTimeManager.ensureLastTimePartition(timePartitionId);
-
// fire trigger before insertion
// TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, plan);
// insert to sequence or unSequence file
@@ -3251,7 +3293,7 @@ public class DataRegion {
@FunctionalInterface
public interface UpdateEndTimeCallBack {
- boolean call(TsFileProcessor caller);
+ boolean call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
}
@FunctionalInterface
@@ -3336,13 +3378,17 @@ public class DataRegion {
}
}
+ public void releaseFlushTimeMap(long timePartitionId) {
+ lastFlushTimeMap.removePartition(timePartitionId);
+ }
+
public long getMemCost() {
return dataRegionInfo.getMemCost();
}
@TestOnly
- public ILastFlushTimeManager getLastFlushTimeManager() {
- return lastFlushTimeManager;
+ public ILastFlushTimeMap getLastFlushTimeMap() {
+ return lastFlushTimeMap;
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
index f01139166b..fc89a2f125 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegionInfo.java
@@ -41,7 +41,7 @@ public class DataRegionInfo {
(long)
(IoTDBDescriptor.getInstance().getConfig().getWriteMemoryVariationReportProportion()
* IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForStorageEngine()
- * IoTDBDescriptor.getInstance().getConfig().getWriteProportion());
+ * IoTDBDescriptor.getInstance().getConfig().getWriteProportionForMemtable());
private final AtomicLong lastReportedSize = new AtomicLong();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
similarity index 52%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index f16f3181f8..b110cd6768 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -23,22 +23,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-/**
- * This class manages last time and flush time for sequence and unsequence determination This class
- * This class is NOT thread safe, caller should ensure synchronization
- */
-public class LastFlushTimeManager implements ILastFlushTimeManager {
- private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class);
- /*
- * time partition id -> map, which contains
- * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
- * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
- * when a flush is issued.
+public class HashLastFlushTimeMap implements ILastFlushTimeMap {
+
+ private static final Logger logger = LoggerFactory.getLogger(HashLastFlushTimeMap.class);
+
+ /**
+ * String basic total, 40B
+ *
+ * <ul>
+ * <li>Object header: Mark Word + Classic Pointer, 12B
+ * <li>char[] reference 4B
+ * <li>hash code, 4B
+ * <li>padding 4B
+ * <li>char[] header + length 16B
+ * </ul>
*/
- private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
+ long STRING_BASE_SIZE = 40;
+
+ long LONG_SIZE = 24;
+
+ long HASHMAP_NODE_BASIC_SIZE = 14 + STRING_BASE_SIZE + LONG_SIZE;
+
/**
* time partition id -> map, which contains device -> largest timestamp of the latest memtable to
* be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a
@@ -58,31 +66,46 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
*/
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
- // region set
- @Override
- public void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap) {
- latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(lastTimeMap);
- }
+ /** used for recovering flush time from tsfile resource */
+ TsFileManager tsFileManager;
- @Override
- public void setOneDeviceLastTime(long timePartitionId, String path, long time) {
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()).put(path, time);
+ /** record memory cost of map for each partitionId */
+ private Map<Long, Long> memCostForEachPartition = new HashMap<>();
+
+ public HashLastFlushTimeMap(TsFileManager tsFileManager) {
+ this.tsFileManager = tsFileManager;
}
@Override
public void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(flushedTimeMap);
+ Map<String, Long> flushTimeMapForPartition =
+ partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+ if (flushTimeMapForPartition == null) {
+ return;
+ }
+ long memIncr = 0;
+ for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+ if (flushTimeMapForPartition.put(entry.getKey(), entry.getValue()) == null) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
+ }
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 + finalMemIncr);
}
@Override
public void setOneDeviceFlushedTime(long timePartitionId, String path, long time) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .put(path, time);
+ Map<String, Long> flushTimeMapForPartition =
+ partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+ if (flushTimeMapForPartition == null) {
+ return;
+ }
+ if (flushTimeMapForPartition.put(path, time) == null) {
+ long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
+ }
}
@Override
@@ -95,22 +118,24 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
globalLatestFlushedTimeForEachDevice.put(path, time);
}
- // endregion
-
- // region update
-
- @Override
- public void updateLastTime(long timePartitionId, String path, long time) {
- latestTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
- }
-
@Override
public void updateFlushedTime(long timePartitionId, String path, long time) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .compute(path, (k, v) -> v == null ? time : Math.max(v, time));
+ Map<String, Long> flushTimeMapForPartition =
+ partitionLatestFlushedTimeForEachDevice.get(timePartitionId);
+ if (flushTimeMapForPartition == null) {
+ return;
+ }
+ flushTimeMapForPartition.compute(
+ path,
+ (k, v) -> {
+ if (v == null) {
+ long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
+ return time;
+ }
+ return Math.max(v, time);
+ });
}
@Override
@@ -127,39 +152,23 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
.compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
}
- // endregion
-
- // region ensure
-
@Override
- public void ensureLastTimePartition(long timePartitionId) {
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
- }
-
- @Override
- public void ensureFlushedTimePartition(long timePartitionId) {
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
- }
-
- @Override
- public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
- return partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .computeIfAbsent(path, id -> initTime);
+ public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+ if (!partitionLatestFlushedTimeForEachDevice.containsKey(timePartitionId)) {
+ partitionLatestFlushedTimeForEachDevice.put(timePartitionId, new HashMap<>());
+ return false;
+ }
+ return true;
}
- // endregion
-
- // region upgrade support methods
-
@Override
public void applyNewlyFlushedTimeToFlushedTime() {
- for (Entry<Long, Map<String, Long>> entry :
+ for (Map.Entry<Long, Map<String, Long>> entry :
newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
long timePartitionId = entry.getKey();
Map<String, Long> latestFlushTimeForPartition =
partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
- for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
+ for (Map.Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
String device = endTimeMap.getKey();
long endTime = endTimeMap.getValue();
if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
@@ -171,49 +180,9 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
}
}
- /**
- * update latest flush time for partition id
- *
- * @param partitionId partition id
- * @param latestFlushTime lastest flush time
- * @return true if update latest flush time success
- */
- @Override
- public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
-
- if (curPartitionDeviceLatestTime == null) {
- return false;
- }
-
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- // set lastest flush time to latestTimeForEachDevice
- entry.setValue(latestFlushTime);
-
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- newlyFlushedPartitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
- < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
- }
- return true;
- }
-
@Override
- public boolean updateLatestFlushTime(long partitionId) {
- // update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
-
- if (curPartitionDeviceLatestTime == null) {
- return false;
- }
-
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+ for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(partitionId, id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
@@ -227,19 +196,11 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
return true;
}
- // endregion
-
- // region query
@Override
public long getFlushedTime(long timePartitionId, String path) {
return partitionLatestFlushedTimeForEachDevice
.get(timePartitionId)
- .getOrDefault(path, Long.MIN_VALUE);
- }
-
- @Override
- public long getLastTime(long timePartitionId, String path) {
- return latestTimeForEachDevice.get(timePartitionId).getOrDefault(path, Long.MIN_VALUE);
+ .computeIfAbsent(path, k -> recoverFlushTime(timePartitionId, path));
}
@Override
@@ -247,14 +208,6 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE);
}
- // endregion
-
- // region clear
- @Override
- public void clearLastTime() {
- latestTimeForEachDevice.clear();
- }
-
@Override
public void clearFlushedTime() {
partitionLatestFlushedTimeForEachDevice.clear();
@@ -264,5 +217,33 @@ public class LastFlushTimeManager implements ILastFlushTimeManager {
public void clearGlobalFlushedTime() {
globalLatestFlushedTimeForEachDevice.clear();
}
- // endregion
+
+ @Override
+ public void removePartition(long partitionId) {
+ partitionLatestFlushedTimeForEachDevice.remove(partitionId);
+ memCostForEachPartition.remove(partitionId);
+ }
+
+ private long recoverFlushTime(long partitionId, String devicePath) {
+ List<TsFileResource> tsFileResourceList =
+ tsFileManager.getSequenceListByTimePartition(partitionId);
+
+ for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
+ if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
+ return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+ }
+ }
+
+ long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * devicePath.length();
+ memCostForEachPartition.compute(partitionId, (k, v) -> v == null ? memCost : v + memCost);
+ return Long.MIN_VALUE;
+ }
+
+ @Override
+ public long getMemSize(long partitionId) {
+ if (memCostForEachPartition.containsKey(partitionId)) {
+ return memCostForEachPartition.get(partitionId);
+ }
+ return 0;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 61388ce66f..8e690290cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -22,43 +22,51 @@ package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
+
+ long LONG_SIZE = 24;
+
+ long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE + LONG_SIZE;
-/**
- * This class manages last time and flush time for sequence and unsequence determination This class
- * This class is NOT thread safe, caller should ensure synchronization This class not support
- * upgrade
- */
-public class IDTableFlushTimeManager implements ILastFlushTimeManager {
IDTable idTable;
- public IDTableFlushTimeManager(IDTable idTable) {
- this.idTable = idTable;
- }
+ TsFileManager tsFileManager;
- // region set
- @Override
- public void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap) {
- for (Map.Entry<String, Long> entry : lastTimeMap.entrySet()) {
- idTable.getDeviceEntry(entry.getKey()).putLastTimeMap(timePartitionId, entry.getValue());
- }
- }
+ /** record memory cost of map for each partitionId */
+ private Map<Long, Long> memCostForEachPartition = new HashMap<>();
- @Override
- public void setOneDeviceLastTime(long timePartitionId, String path, long time) {
- idTable.getDeviceEntry(path).putLastTimeMap(timePartitionId, time);
+ private Set<Long> partitionSet = new HashSet<>();
+
+ public IDTableLastFlushTimeMap(IDTable idTable, TsFileManager tsFileManager) {
+ this.idTable = idTable;
+ this.tsFileManager = tsFileManager;
}
@Override
public void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap) {
for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
- idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue());
+ if (idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue())
+ == null) {
+ memCostForEachPartition.compute(
+ timePartitionId,
+ (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+ }
}
}
@Override
public void setOneDeviceFlushedTime(long timePartitionId, String path, long time) {
- idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time);
+ if (idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time) == null) {
+ memCostForEachPartition.compute(
+ timePartitionId,
+ (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+ }
}
@Override
@@ -73,17 +81,13 @@ public class IDTableFlushTimeManager implements ILastFlushTimeManager {
idTable.getDeviceEntry(path).setGlobalFlushTime(time);
}
- // endregion
-
- // region update
-
- @Override
- public void updateLastTime(long timePartitionId, String path, long time) {
- idTable.getDeviceEntry(path).updateLastTimeMap(timePartitionId, time);
- }
-
@Override
public void updateFlushedTime(long timePartitionId, String path, long time) {
+ if (idTable.getDeviceEntry(path).getFlushTime(timePartitionId) == null) {
+ memCostForEachPartition.compute(
+ timePartitionId,
+ (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+ }
idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
}
@@ -98,110 +102,87 @@ public class IDTableFlushTimeManager implements ILastFlushTimeManager {
throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
}
- // endregion
-
- // region ensure
-
- @Override
- public void ensureLastTimePartition(long timePartitionId) {
- // do nothing is correct
- }
-
@Override
- public void ensureFlushedTimePartition(long timePartitionId) {
- // do nothing is correct
+ public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
+ return !partitionSet.add(timePartitionId);
}
- @Override
- public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
- return idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, initTime);
- }
-
- // endregion
-
- // region upgrade support methods
-
@Override
public void applyNewlyFlushedTimeToFlushedTime() {
throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
}
- /**
- * update latest flush time for partition id
- *
- * @param partitionId partition id
- * @param latestFlushTime lastest flush time
- * @return true if update latest flush time success
- */
@Override
- public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
- for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
- deviceEntry.putLastTimeMap(partitionId, latestFlushTime);
- deviceEntry.putFlushTimeMap(partitionId, latestFlushTime);
- deviceEntry.updateGlobalFlushTime(latestFlushTime);
+ public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+ for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
+ DeviceEntry deviceEntry = idTable.getDeviceEntry(entry.getKey());
+ deviceEntry.updateFlushTimeMap(partitionId, entry.getValue());
+ if (deviceEntry.getGlobalFlushTime() < entry.getValue()) {
+ deviceEntry.setGlobalFlushTime(entry.getValue());
+ }
}
-
return true;
}
@Override
- public boolean updateLatestFlushTime(long partitionId) {
- boolean updated = false;
-
- for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
- Long lastTime = deviceEntry.getLastTime(partitionId);
- if (lastTime == null) {
- continue;
- }
-
- updated = true;
- deviceEntry.putFlushTimeMap(partitionId, lastTime);
- deviceEntry.updateGlobalFlushTime(lastTime);
+ public long getFlushedTime(long timePartitionId, String path) {
+ Long flushTime = idTable.getDeviceEntry(path).getFlushTime(timePartitionId);
+ if (flushTime != null) {
+ return flushTime;
}
-
- return updated;
+ long time = recoverFlushTime(timePartitionId, path);
+ idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
+ return time;
}
- // endregion
-
- // region query
@Override
- public long getFlushedTime(long timePartitionId, String path) {
- return idTable.getDeviceEntry(path).getFLushTimeWithDefaultValue(timePartitionId);
+ public long getGlobalFlushedTime(String path) {
+ return idTable.getDeviceEntry(path).getGlobalFlushTime();
}
@Override
- public long getLastTime(long timePartitionId, String path) {
- return idTable.getDeviceEntry(path).getLastTimeWithDefaultValue(timePartitionId);
+ public void clearFlushedTime() {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.clearFlushTime();
+ }
}
@Override
- public long getGlobalFlushedTime(String path) {
- return idTable.getDeviceEntry(path).getGlobalFlushTime();
+ public void clearGlobalFlushedTime() {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+ }
}
- // endregion
-
- // region clear
@Override
- public void clearLastTime() {
+ public void removePartition(long partitionId) {
for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
- deviceEntry.clearLastTime();
+ deviceEntry.removePartition(partitionId);
}
+ partitionSet.remove(partitionId);
+ memCostForEachPartition.remove(partitionId);
}
- @Override
- public void clearFlushedTime() {
- for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
- deviceEntry.clearFlushTime();
+ private long recoverFlushTime(long partitionId, String devicePath) {
+ List<TsFileResource> tsFileResourceList =
+ tsFileManager.getSequenceListByTimePartition(partitionId);
+
+ for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
+ if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
+ return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+ }
}
+
+ memCostForEachPartition.compute(
+ partitionId, (k, v) -> v == null ? HASHMAP_NODE_BASIC_SIZE : v + HASHMAP_NODE_BASIC_SIZE);
+ return Long.MIN_VALUE;
}
@Override
- public void clearGlobalFlushedTime() {
- for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
- deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+ public long getMemSize(long partitionId) {
+ if (memCostForEachPartition.containsKey(partitionId)) {
+ return memCostForEachPartition.get(partitionId);
}
+ return 0;
}
- // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index 23b988aaf7..a98dd06e6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,18 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.storagegroup;
import java.util.Map;
/** This interface manages last time and flush time for sequence and unsequence determination */
-public interface ILastFlushTimeManager {
+public interface ILastFlushTimeMap {
// region set
- void setMultiDeviceLastTime(long timePartitionId, Map<String, Long> lastTimeMap);
-
- void setOneDeviceLastTime(long timePartitionId, String path, long time);
-
void setMultiDeviceFlushedTime(long timePartitionId, Map<String, Long> flushedTimeMap);
void setOneDeviceFlushedTime(long timePartitionId, String path, long time);
@@ -38,7 +35,6 @@ public interface ILastFlushTimeManager {
// endregion
// region update
- void updateLastTime(long timePartitionId, String path, long time);
void updateFlushedTime(long timePartitionId, String path, long time);
@@ -49,41 +45,29 @@ public interface ILastFlushTimeManager {
// endregion
// region ensure
- void ensureLastTimePartition(long timePartitionId);
+ boolean checkAndCreateFlushedTimePartition(long timePartitionId);
- void ensureFlushedTimePartition(long timePartitionId);
-
- long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime);
// endregion
// region support upgrade methods
void applyNewlyFlushedTimeToFlushedTime();
- /**
- * update latest flush time for partition id
- *
- * @param partitionId partition id
- * @param latestFlushTime lastest flush time
- * @return true if update latest flush time success
- */
- boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime);
-
- boolean updateLatestFlushTime(long partitionId);
+ boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
// endregion
// region query
long getFlushedTime(long timePartitionId, String path);
- long getLastTime(long timePartitionId, String path);
-
long getGlobalFlushedTime(String path);
// endregion
// region clear
- void clearLastTime();
-
void clearFlushedTime();
void clearGlobalFlushedTime();
// endregion
+
+ void removePartition(long partitionId);
+
+ long getMemSize(long partitionId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java
new file mode 100644
index 0000000000..4d01a573ad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+
+/** Time partition info records necessary info of a time partition for a data region */
+public class TimePartitionInfo {
+ DataRegionId dataRegionId;
+
+ long partitionId;
+
+ boolean isActive;
+
+ long lastSystemFlushTime;
+
+ boolean isLatestPartition;
+
+ long memSize;
+
+ public TimePartitionInfo(
+ DataRegionId dataRegionId,
+ long partitionId,
+ boolean isActive,
+ long lastSystemFlushTime,
+ long memsize,
+ boolean isLatestPartition) {
+ this.dataRegionId = dataRegionId;
+ this.partitionId = partitionId;
+ this.isActive = isActive;
+ this.lastSystemFlushTime = lastSystemFlushTime;
+ this.memSize = memsize;
+ this.isLatestPartition = isLatestPartition;
+ }
+
+ public int comparePriority(TimePartitionInfo timePartitionInfo) {
+ int cmp = Boolean.compare(isActive, timePartitionInfo.isActive);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = Boolean.compare(isLatestPartition, timePartitionInfo.isLatestPartition);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Long.compare(lastSystemFlushTime, timePartitionInfo.lastSystemFlushTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
new file mode 100644
index 0000000000..1e26a46b8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/** Manage all the time partitions for all data regions and control the total memory of them */
+public class TimePartitionManager {
+ final Map<DataRegionId, Map<Long, TimePartitionInfo>> timePartitionInfoMap;
+
+ long memCost = 0;
+ long timePartitionInfoMemoryThreshold =
+ IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForTimePartitionInfo();
+
+ private TimePartitionManager() {
+ timePartitionInfoMap = new HashMap<>();
+ }
+
+ public void registerTimePartitionInfo(TimePartitionInfo timePartitionInfo) {
+ synchronized (timePartitionInfoMap) {
+ TreeMap<Long, TimePartitionInfo> timePartitionInfoMapForRegion =
+ (TreeMap<Long, TimePartitionInfo>)
+ timePartitionInfoMap.computeIfAbsent(
+ timePartitionInfo.dataRegionId, k -> new TreeMap<>());
+
+ Map.Entry<Long, TimePartitionInfo> entry =
+ timePartitionInfoMapForRegion.floorEntry(timePartitionInfo.partitionId);
+ if (entry != null) {
+ entry.getValue().isLatestPartition = false;
+ }
+
+ timePartitionInfoMapForRegion.put(timePartitionInfo.partitionId, timePartitionInfo);
+ }
+ }
+
+ public void updateAfterFlushing(
+ DataRegionId dataRegionId,
+ long timePartitionId,
+ long systemFlushTime,
+ long memSize,
+ boolean isActive) {
+ synchronized (timePartitionInfoMap) {
+ TimePartitionInfo timePartitionInfo =
+ timePartitionInfoMap
+ .computeIfAbsent(dataRegionId, k -> new TreeMap<>())
+ .get(timePartitionId);
+ if (timePartitionInfo != null) {
+ timePartitionInfo.lastSystemFlushTime = systemFlushTime;
+ memCost += memSize - timePartitionInfo.memSize;
+ timePartitionInfo.memSize = memSize;
+ timePartitionInfo.isActive = isActive;
+ if (memCost > timePartitionInfoMemoryThreshold) {
+ evictOldPartition();
+ }
+ }
+ }
+ }
+
+ public void updateAfterOpeningTsFileProcessor(DataRegionId dataRegionId, long timePartitionId) {
+ synchronized (timePartitionInfoMap) {
+ TimePartitionInfo timePartitionInfo =
+ timePartitionInfoMap
+ .computeIfAbsent(dataRegionId, k -> new TreeMap<>())
+ .get(timePartitionId);
+ if (timePartitionInfo != null) {
+ timePartitionInfo.isActive = true;
+ }
+ }
+ }
+
+ private void evictOldPartition() {
+ TreeSet<TimePartitionInfo> treeSet = new TreeSet<>(TimePartitionInfo::comparePriority);
+ synchronized (timePartitionInfoMap) {
+ for (Map.Entry<DataRegionId, Map<Long, TimePartitionInfo>> entry :
+ timePartitionInfoMap.entrySet()) {
+ treeSet.addAll(entry.getValue().values());
+ }
+
+ while (memCost > timePartitionInfoMemoryThreshold) {
+ TimePartitionInfo timePartitionInfo = treeSet.first();
+ memCost -= timePartitionInfo.memSize;
+ DataRegion dataRegion =
+ StorageEngineV2.getInstance().getDataRegion(timePartitionInfo.dataRegionId);
+ if (dataRegion != null) {
+ dataRegion.releaseFlushTimeMap(timePartitionInfo.partitionId);
+ }
+ timePartitionInfoMap
+ .get(timePartitionInfo.dataRegionId)
+ .remove(timePartitionInfo.partitionId);
+ }
+ }
+ }
+
+ public void removePartition(DataRegionId dataRegionId, long partitionId) {
+ synchronized (timePartitionInfoMap) {
+ Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
+ timePartitionInfoMap.get(dataRegionId);
+ if (timePartitionInfoMapForDataRegion != null) {
+ TimePartitionInfo timePartitionInfo = timePartitionInfoMapForDataRegion.get(partitionId);
+ if (timePartitionInfo != null) {
+ timePartitionInfoMapForDataRegion.remove(partitionId);
+ memCost -= timePartitionInfo.memSize;
+ }
+ }
+ }
+ }
+
+ public TimePartitionInfo getTimePartitionInfo(DataRegionId dataRegionId, long timePartitionId) {
+ synchronized (timePartitionInfoMap) {
+ Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
+ timePartitionInfoMap.get(dataRegionId);
+ if (timePartitionInfoMapForDataRegion == null) {
+ return null;
+ }
+ return timePartitionInfoMapForDataRegion.get(timePartitionId);
+ }
+ }
+
+ public void clear() {
+ synchronized (timePartitionInfoMap) {
+ timePartitionInfoMap.clear();
+ memCost = 0;
+ }
+ }
+
+ @TestOnly
+ public void setTimePartitionInfoMemoryThreshold(long timePartitionInfoMemoryThreshold) {
+ this.timePartitionInfoMemoryThreshold = timePartitionInfoMemoryThreshold;
+ }
+
+ public static TimePartitionManager getInstance() {
+ return TimePartitionManager.InstanceHolder.instance;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static TimePartitionManager instance = new TimePartitionManager();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 58d13bb673..725bcb13f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -442,4 +442,10 @@ public class TsFileManager {
return false;
}
}
+
+ // determine whether time partition is the latest(largest) or not
+ public boolean isLatestTimePartition(long timePartitionId) {
+ return (sequenceFiles.higherKey(timePartitionId) == null
+ && unsequenceFiles.higherKey(timePartitionId) == null);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 35eda60ca8..66a72980c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -951,8 +951,15 @@ public class TsFileProcessor {
* flushManager again.
*/
private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+ Map<String, Long> lastTimeForEachDevice = new HashMap<>();
+ if (sequence) {
+ lastTimeForEachDevice = tobeFlushed.getMaxTime();
+ tsFileResource.updateEndTime(lastTimeForEachDevice);
+ }
if (!tobeFlushed.isSignalMemTable()
- && (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) {
+ && (tobeFlushed.memSize() == 0
+ || (!updateLatestFlushTimeCallback.call(
+ this, lastTimeForEachDevice, System.currentTimeMillis())))) {
logger.warn(
"This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
storageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0f2fd7424c..eb234e24ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -1076,6 +1076,12 @@ public class TsFileResource {
}
}
+ public void updateEndTime(Map<String, Long> times) {
+ for (Map.Entry<String, Long> entry : times.entrySet()) {
+ timeIndex.updateEndTime(entry.getKey(), entry.getValue());
+ }
+ }
+
/** @return is this tsfile resource in a TsFileResourceList */
public boolean isFileInList() {
return prev != null || next != null;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index 3ef77edec7..9da30cfb55 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -34,10 +34,6 @@ public class DeviceEntry {
boolean isAligned;
- // for managing last time
- // time partition -> last time
- Map<Long, Long> lastTimeMapOfEachPartition;
-
// for managing flush time
// time partition -> flush time
Map<Long, Long> flushTimeMapOfEachPartition;
@@ -47,7 +43,6 @@ public class DeviceEntry {
public DeviceEntry(IDeviceID deviceID) {
this.deviceID = deviceID;
measurementMap = new ConcurrentHashMap<>();
- lastTimeMapOfEachPartition = new HashMap<>();
flushTimeMapOfEachPartition = new HashMap<>();
}
@@ -94,17 +89,9 @@ public class DeviceEntry {
}
// region support flush time
- public void putLastTimeMap(long timePartition, long lastTime) {
- lastTimeMapOfEachPartition.put(timePartition, lastTime);
- }
-
- public void putFlushTimeMap(long timePartition, long flushTime) {
- flushTimeMapOfEachPartition.put(timePartition, flushTime);
- }
- public long updateLastTimeMap(long timePartition, long lastTime) {
- return lastTimeMapOfEachPartition.compute(
- timePartition, (k, v) -> v == null ? lastTime : Math.max(v, lastTime));
+ public Long putFlushTimeMap(long timePartition, long flushTime) {
+ return flushTimeMapOfEachPartition.put(timePartition, flushTime);
}
public long updateFlushTimeMap(long timePartition, long flushTime) {
@@ -120,18 +107,10 @@ public class DeviceEntry {
this.globalFlushTime = globalFlushTime;
}
- public Long getLastTime(long timePartition) {
- return lastTimeMapOfEachPartition.get(timePartition);
- }
-
public Long getFlushTime(long timePartition) {
return flushTimeMapOfEachPartition.get(timePartition);
}
- public Long getLastTimeWithDefaultValue(long timePartition) {
- return lastTimeMapOfEachPartition.getOrDefault(timePartition, Long.MIN_VALUE);
- }
-
public Long getFLushTimeWithDefaultValue(long timePartition) {
return flushTimeMapOfEachPartition.getOrDefault(timePartition, Long.MIN_VALUE);
}
@@ -140,8 +119,8 @@ public class DeviceEntry {
return globalFlushTime;
}
- public void clearLastTime() {
- lastTimeMapOfEachPartition.clear();
+ public void removePartition(long partitionId) {
+ flushTimeMapOfEachPartition.remove(partitionId);
}
public void clearFlushTime() {
@@ -166,18 +145,12 @@ public class DeviceEntry {
&& globalFlushTime == that.globalFlushTime
&& deviceID.equals(that.deviceID)
&& measurementMap.equals(that.measurementMap)
- && lastTimeMapOfEachPartition.equals(that.lastTimeMapOfEachPartition)
&& flushTimeMapOfEachPartition.equals(that.flushTimeMapOfEachPartition);
}
@Override
public int hashCode() {
return Objects.hash(
- deviceID,
- measurementMap,
- isAligned,
- lastTimeMapOfEachPartition,
- flushTimeMapOfEachPartition,
- globalFlushTime);
+ deviceID, measurementMap, isAligned, flushTimeMapOfEachPartition, globalFlushTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 59b6defdd0..32d7648426 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -46,7 +46,7 @@ public class SystemInfo {
private long totalStorageGroupMemCost = 0L;
private volatile boolean rejected = false;
- private long memorySizeForWrite;
+ private long memorySizeForMemtable;
private long memorySizeForCompaction;
private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
@@ -55,8 +55,8 @@ public class SystemInfo {
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
- private double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
- private double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+ private double FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+ private double REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
private volatile boolean isEncodingFasterThanIo = true;
@@ -106,14 +106,14 @@ public class SystemInfo {
REJECT_THERSHOLD);
rejected = true;
if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
- if (totalStorageGroupMemCost < memorySizeForWrite) {
+ if (totalStorageGroupMemCost < memorySizeForMemtable) {
return true;
} else {
throw new WriteProcessRejectException(
"Total Storage Group MemCost "
+ totalStorageGroupMemCost
+ " is over than memorySizeForWriting "
- + memorySizeForWrite);
+ + memorySizeForMemtable);
}
} else {
return false;
@@ -198,12 +198,13 @@ public class SystemInfo {
}
public void allocateWriteMemory() {
- memorySizeForWrite =
- (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion());
+ memorySizeForMemtable =
+ (long)
+ (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable());
memorySizeForCompaction =
(long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion());
- FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
- REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+ FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+ REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
}
@TestOnly
@@ -284,15 +285,15 @@ public class SystemInfo {
}
public synchronized void applyTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
- memorySizeForWrite -= estimatedTemporaryMemSize;
- FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
- REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+ memorySizeForMemtable -= estimatedTemporaryMemSize;
+ FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+ REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
}
public synchronized void releaseTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
- memorySizeForWrite += estimatedTemporaryMemSize;
- FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion();
- REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion();
+ memorySizeForMemtable += estimatedTemporaryMemSize;
+ FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
+ REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
}
public long getTotalMemTableSize() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index b6ff3044e7..01d900a8a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -170,7 +170,7 @@ public abstract class AlignedTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
for (int i = 0; i < values.size(); i++) {
Object columnValue = columnIndexArray[i] < 0 ? null : value[columnIndexArray[i]];
@@ -704,7 +704,7 @@ public abstract class AlignedTVList extends TVList {
checkExpansion();
int idx = start;
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
while (idx < end) {
int inputRemaining = end - idx;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 6c4e500495..1e644e41a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -87,7 +87,7 @@ public abstract class BinaryTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -105,12 +105,12 @@ public abstract class BinaryTVList extends TVList {
@Override
public int delete(long lowerBound, long upperBound) {
int newSize = 0;
- minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
for (int i = 0; i < rowCount; i++) {
long time = getTime(i);
if (time < lowerBound || time > upperBound) {
set(i, newSize++);
- minTime = Math.min(time, minTime);
+ maxTime = Math.max(maxTime, time);
} else {
memoryBinaryChunkSize -= getBinarySize(getBinary(i));
}
@@ -215,10 +215,10 @@ public abstract class BinaryTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
// update raw size
@@ -252,7 +252,7 @@ public abstract class BinaryTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, Binary[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -269,14 +269,15 @@ public abstract class BinaryTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 5e6f7ea366..3af4613bc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -80,7 +80,7 @@ public abstract class BooleanTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -174,10 +174,10 @@ public abstract class BooleanTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
while (idx < end) {
@@ -206,7 +206,7 @@ public abstract class BooleanTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, boolean[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -223,14 +223,15 @@ public abstract class BooleanTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 41ae89264c..98800c457e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -80,7 +80,7 @@ public abstract class DoubleTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -179,10 +179,10 @@ public abstract class DoubleTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
while (idx < end) {
@@ -211,7 +211,7 @@ public abstract class DoubleTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, double[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -228,14 +228,15 @@ public abstract class DoubleTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index 05e2b4289e..da39321830 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -80,7 +80,7 @@ public abstract class FloatTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -179,10 +179,10 @@ public abstract class FloatTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
while (idx < end) {
@@ -211,7 +211,7 @@ public abstract class FloatTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, float[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -228,14 +228,15 @@ public abstract class FloatTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index 9ea657755c..341deebbeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -79,7 +79,7 @@ public abstract class IntTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -172,10 +172,10 @@ public abstract class IntTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
while (idx < end) {
@@ -204,7 +204,7 @@ public abstract class IntTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, int[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -221,14 +221,15 @@ public abstract class IntTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index 41ce036ab7..7004543ac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -79,7 +79,7 @@ public abstract class LongTVList extends TVList {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
- minTime = Math.min(minTime, timestamp);
+ maxTime = Math.max(maxTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
values.get(arrayIndex)[elementIndex] = value;
rowCount++;
@@ -172,10 +172,10 @@ public abstract class LongTVList extends TVList {
timeIdxOffset = start;
// drop null at the end of value array
int nullCnt =
- dropNullValThenUpdateMinTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
+ dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset);
end -= nullCnt;
} else {
- updateMinTimeAndSorted(time, start, end);
+ updateMaxTimeAndSorted(time, start, end);
}
while (idx < end) {
@@ -204,7 +204,7 @@ public abstract class LongTVList extends TVList {
}
// move null values to the end of time array and value array, then return number of null values
- int dropNullValThenUpdateMinTimeAndSorted(
+ int dropNullValThenUpdateMaxTimeAndSorted(
long[] time, long[] values, BitMap bitMap, int start, int end, int tIdxOffset) {
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
@@ -221,14 +221,15 @@ public abstract class LongTVList extends TVList {
time[tIdx - nullCnt] = time[tIdx];
values[vIdx - nullCnt] = values[vIdx];
}
- // update minTime and sorted
+ // update maxTime and sorted
tIdx = tIdx - nullCnt;
inPutMinTime = Math.min(inPutMinTime, time[tIdx]);
+ maxTime = Math.max(maxTime, time[tIdx]);
if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
+
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
return nullCnt;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index cb0f75c573..c6105744fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -57,7 +57,7 @@ public abstract class TVList implements WALEntryValue {
protected int rowCount;
protected boolean sorted = true;
- protected long minTime;
+ protected long maxTime;
// record reference count of this tv list
// currently this reference will only be increase because we can't know when to decrease it
protected AtomicInteger referenceCount;
@@ -66,7 +66,7 @@ public abstract class TVList implements WALEntryValue {
public TVList() {
timestamps = new ArrayList<>();
rowCount = 0;
- minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
referenceCount = new AtomicInteger();
}
@@ -228,8 +228,8 @@ public abstract class TVList implements WALEntryValue {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
- public long getMinTime() {
- return minTime;
+ public long getMaxTime() {
+ return maxTime;
}
public long getVersion() {
@@ -256,12 +256,12 @@ public abstract class TVList implements WALEntryValue {
public int delete(long lowerBound, long upperBound) {
int newSize = 0;
- minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
for (int i = 0; i < rowCount; i++) {
long time = getTime(i);
if (time < lowerBound || time > upperBound) {
set(i, newSize++);
- minTime = Math.min(time, minTime);
+ maxTime = Math.max(time, maxTime);
}
}
int deletedNumber = rowCount - newSize;
@@ -285,13 +285,13 @@ public abstract class TVList implements WALEntryValue {
}
cloneList.rowCount = rowCount;
cloneList.sorted = sorted;
- cloneList.minTime = minTime;
+ cloneList.maxTime = maxTime;
}
public void clear() {
rowCount = 0;
sorted = true;
- minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
clearTime();
clearValue();
}
@@ -324,17 +324,17 @@ public abstract class TVList implements WALEntryValue {
return cloneArray;
}
- void updateMinTimeAndSorted(long[] time, int start, int end) {
+ void updateMaxTimeAndSorted(long[] time, int start, int end) {
int length = time.length;
long inPutMinTime = Long.MAX_VALUE;
boolean inputSorted = true;
for (int i = start; i < end; i++) {
inPutMinTime = Math.min(inPutMinTime, time[i]);
+ maxTime = Math.max(maxTime, time[i]);
if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
inputSorted = false;
}
}
- minTime = Math.min(inPutMinTime, minTime);
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index c19fad496c..8a5888601e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -1111,7 +1111,7 @@ public class DataRegionTest {
// delete data which is in work memtable
dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
- dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+ dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertFalse(tsFileResource.getModFile().exists());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java
new file mode 100644
index 0000000000..dd3fa63b04
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMapTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+public class IDTableLastFlushTimeMapTest extends LastFlushTimeMapTest {
+
+ // private boolean isEnableIDTable = false;
+ //
+ // private String originalDeviceIDTransformationMethod = null;
+ //
+ // private boolean isEnableIDTableLogFile = false;
+ //
+ // public IDTableLastFlushTimeMapTest() throws QueryProcessException {}
+ //
+ // @Before
+ // public void before() {
+ // IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ // isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ // originalDeviceIDTransformationMethod =
+ // IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ // isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+ //
+ // IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+ // IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ // IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+ // super.before();
+ // }
+ //
+ // @After
+ // public void clean() throws IOException, StorageEngineException {
+ // IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+ // IoTDBDescriptor.getInstance()
+ // .getConfig()
+ // .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ // IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+ // super.clean();
+ // }
+ //
+ // @Override
+ // @Test
+ // public void testMemoryCalculation()
+ // throws QueryProcessException, IllegalPathException, StorageEngineException,
+ // StorageGroupNotSetException {
+ // insertRecord("root.sg.d1", 100L);
+ // CreateMultiTimeSeriesPlan plan = new CreateMultiTimeSeriesPlan();
+ // plan.setPaths(
+ // Arrays.asList(
+ // new PartialPath("root.sg.d100.s1"),
+ // new PartialPath("root.sg.d101.s1"),
+ // new PartialPath("root.sg.d102.s1"),
+ // new PartialPath("root.sg.d103.s1"),
+ // new PartialPath("root.sg.d104.s1")));
+ // plan.setDataTypes(
+ // Arrays.asList(
+ // TSDataType.INT64,
+ // TSDataType.INT64,
+ // TSDataType.INT64,
+ // TSDataType.INT64,
+ // TSDataType.INT64));
+ // plan.setEncodings(
+ // Arrays.asList(
+ // TSEncoding.GORILLA,
+ // TSEncoding.GORILLA,
+ // TSEncoding.GORILLA,
+ // TSEncoding.GORILLA,
+ // TSEncoding.GORILLA));
+ // plan.setCompressors(
+ // Arrays.asList(
+ // CompressionType.SNAPPY,
+ // CompressionType.SNAPPY,
+ // CompressionType.SNAPPY,
+ // CompressionType.SNAPPY,
+ // CompressionType.SNAPPY));
+ // executor.processNonQuery(plan);
+ //
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.sg"));
+ // assertEquals(62l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d100");
+ // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d101");
+ // assertEquals(186l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().setOneDeviceFlushedTime(0L, "root.sg.d102", 0L);
+ // HashMap<String, Long> updateMap = new HashMap<>();
+ // updateMap.put("root.sg.d103", 1L);
+ // updateMap.put("root.sg.d100", 1L);
+ // storageGroupProcessor.getLastFlushTimeMap().setMultiDeviceFlushedTime(0L, updateMap);
+ // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d103", 2L);
+ // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d104", 2L);
+ // assertEquals(372L, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ // }
+ //
+ // @Test
+ // public void testRecoverFlushTime()
+ // throws QueryProcessException, IllegalPathException, StorageEngineException,
+ // StorageGroupNotSetException {
+ // insertData(100);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp"));
+ // String deviceId = DeviceIDFactory.getInstance().getDeviceID("root.isp.d1").toStringID();
+ // assertEquals(103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l,
+ // deviceId));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().removePartition(0l);
+ // storageGroupProcessor.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0l);
+ // assertEquals(103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l,
+ // deviceId));
+ // }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
new file mode 100644
index 0000000000..2d945c6722
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+public class LastFlushTimeMapTest {
+ // protected PlanExecutor executor = new PlanExecutor();
+ //
+ // protected final Planner processor = new Planner();
+ //
+ // public LastFlushTimeMapTest() throws QueryProcessException {}
+ //
+ // @Before
+ // public void before() {
+ // IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ // EnvironmentUtils.envSetUp();
+ // }
+ //
+ // @Test
+ // public void testSequenceInsert()
+ // throws MetadataException, QueryProcessException, StorageEngineException {
+ // insertData(0);
+ // insertData(10);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ //
+ // insertData(20);
+ //
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ // assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
+ // assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
+ // }
+ //
+ // @Test
+ // public void testUnSequenceInsert()
+ // throws MetadataException, QueryProcessException, StorageEngineException {
+ // insertData(100);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ //
+ // insertData(20);
+ //
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ // assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
+ // assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
+ // }
+ //
+ // @Test
+ // public void testSequenceAndUnSequenceInsert()
+ // throws MetadataException, QueryProcessException, StorageEngineException {
+ // // sequence
+ // insertData(100);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ //
+ // // sequence
+ // insertData(120);
+ // executor.processNonQuery(flushPlan);
+ //
+ // // unsequence
+ // insertData(20);
+ // // sequence
+ // insertData(130);
+ // executor.processNonQuery(flushPlan);
+ //
+ // // sequence
+ // insertData(150);
+ // // unsequence
+ // insertData(90);
+ //
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ // assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
+ // assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
+ // assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
+ // assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
+ // }
+ //
+ // @Test
+ // public void testDeletePartition()
+ // throws MetadataException, QueryProcessException, StorageEngineException {
+ // insertData(100);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ // insertData(20);
+ // insertData(120);
+ //
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ //
+ // assertEquals(
+ // 103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.isp.d1"));
+ // assertEquals(
+ // 103L, storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
+ //
+ // // delete time partition
+ // Set<Long> deletedPartition = new HashSet<>();
+ // deletedPartition.add(0L);
+ // DeletePartitionPlan deletePartitionPlan =
+ // new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
+ // executor.processNonQuery(deletePartitionPlan);
+ //
+ // assertEquals(
+ // 123L, storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
+ // }
+ //
+ // @Test
+ // public void testMemoryCalculation()
+ // throws QueryProcessException, IllegalPathException, StorageEngineException,
+ // StorageGroupNotSetException {
+ // insertRecord("root.sg.d1", 100L);
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.sg"));
+ // assertEquals(98l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d100");
+ // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.sg.d101");
+ // assertEquals(302l, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().setOneDeviceFlushedTime(0L, "root.sg.d102", 0L);
+ // HashMap<String, Long> updateMap = new HashMap<>();
+ // updateMap.put("root.sg.d103", 1L);
+ // updateMap.put("root.sg.d100", 1L);
+ // storageGroupProcessor.getLastFlushTimeMap().setMultiDeviceFlushedTime(0L, updateMap);
+ // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d103", 2L);
+ // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L, "root.sg.d104", 2L);
+ // assertEquals(608L, storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
+ // }
+ //
+ // @Test
+ // public void testRecoverFlushTime()
+ // throws QueryProcessException, IllegalPathException, StorageEngineException,
+ // StorageGroupNotSetException {
+ // insertData(100);
+ // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ // executor.processNonQuery(flushPlan);
+ // DataRegion storageGroupProcessor =
+ // StorageEngine.getInstance().getProcessor(new PartialPath("root.isp"));
+ // assertEquals(
+ // 103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
+ //
+ // storageGroupProcessor.getLastFlushTimeMap().removePartition(0l);
+ // storageGroupProcessor.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0l);
+ // assertEquals(
+ // 103L, storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
+ // }
+ //
+ // @After
+ // public void clean() throws IOException, StorageEngineException {
+ // EnvironmentUtils.cleanEnv();
+ // }
+ //
+ // protected void insertData(long initTime) throws IllegalPathException, QueryProcessException {
+ //
+ // long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
+ // List<Integer> dataTypes = new ArrayList<>();
+ // dataTypes.add(TSDataType.DOUBLE.ordinal());
+ // dataTypes.add(TSDataType.FLOAT.ordinal());
+ // dataTypes.add(TSDataType.INT64.ordinal());
+ // dataTypes.add(TSDataType.INT32.ordinal());
+ // dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ // dataTypes.add(TSDataType.TEXT.ordinal());
+ //
+ // Object[] columns = new Object[6];
+ // columns[0] = new double[4];
+ // columns[1] = new float[4];
+ // columns[2] = new long[4];
+ // columns[3] = new int[4];
+ // columns[4] = new boolean[4];
+ // columns[5] = new Binary[4];
+ //
+ // for (int r = 0; r < 4; r++) {
+ // ((double[]) columns[0])[r] = 10.0 + r;
+ // ((float[]) columns[1])[r] = 20 + r;
+ // ((long[]) columns[2])[r] = 100000 + r;
+ // ((int[]) columns[3])[r] = 1000 + r;
+ // ((boolean[]) columns[4])[r] = false;
+ // ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+ // }
+ //
+ // InsertTabletPlan tabletPlan =
+ // new InsertTabletPlan(
+ // new PartialPath("root.isp.d1"),
+ // new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ // dataTypes);
+ // tabletPlan.setTimes(times);
+ // tabletPlan.setColumns(columns);
+ // tabletPlan.setRowCount(times.length);
+ //
+ // executor.insertTablet(tabletPlan);
+ // }
+ //
+ // protected void insertRecord(String devicePath, long time)
+ // throws IllegalPathException, QueryProcessException {
+ // InsertRowPlan insertRowPlan =
+ // new InsertRowPlan(
+ // new PartialPath(devicePath),
+ // time,
+ // new String[] {"s1", "s2", "s3"},
+ // new TSDataType[] {TSDataType.INT32, TSDataType.INT32, TSDataType.INT32},
+ // new String[] {"1", "1", "1"});
+ //
+ // executor.insert(insertRowPlan);
+ // }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 8f96c9223d..6cc26b0a11 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -21,6 +21,7 @@
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -74,7 +75,9 @@ import static org.junit.Assert.assertTrue;
public class TTLTest {
private String sg1 = "root.TTL_SG1";
+ private DataRegionId dataRegionId1 = new DataRegionId(1);
private String sg2 = "root.TTL_SG2";
+ private DataRegionId dataRegionId2 = new DataRegionId(1);
private long ttl = 12345;
private DataRegion dataRegion;
private String s1 = "s1";
@@ -105,7 +108,7 @@ public class TTLTest {
dataRegion =
new DataRegion(
IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
- sg1,
+ String.valueOf(dataRegionId1.getId()),
new DirectFlushPolicy(),
sg1);
IoTDB.schemaProcessor.createTimeseries(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java
new file mode 100644
index 0000000000..1b5f1f15b2
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimePartitionManagerTest {
+
+ private final TimePartitionManager timePartitionManager = TimePartitionManager.getInstance();
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ private long prevTimePartitionInfoMemoryThreshold;
+
+ public TimePartitionManagerTest() throws QueryProcessException {}
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException, MetadataException {
+ prevTimePartitionInfoMemoryThreshold = CONFIG.getAllocateMemoryForTimePartitionInfo();
+ timePartitionManager.setTimePartitionInfoMemoryThreshold(100L);
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ timePartitionManager.setTimePartitionInfoMemoryThreshold(prevTimePartitionInfoMemoryThreshold);
+ timePartitionManager.clear();
+ }
+
+ @Test
+ public void testRegisterPartitionInfo() {
+ TimePartitionInfo timePartitionInfo1 =
+ new TimePartitionInfo(new DataRegionId(1), 0L, true, Long.MAX_VALUE, 0, true);
+ timePartitionManager.registerTimePartitionInfo(timePartitionInfo1);
+
+ assertEquals(
+ timePartitionInfo1, timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L));
+
+ TimePartitionInfo timePartitionInfo2 =
+ new TimePartitionInfo(new DataRegionId(1), 1L, true, Long.MAX_VALUE, 0, true);
+ timePartitionManager.registerTimePartitionInfo(timePartitionInfo2);
+
+ Assert.assertFalse(
+ timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L).isLatestPartition);
+ Assert.assertTrue(
+ timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 1L).isLatestPartition);
+ }
+
+ @Test
+ public void testUpdate() {
+ TimePartitionInfo timePartitionInfo =
+ new TimePartitionInfo(new DataRegionId(1), 0L, true, Long.MAX_VALUE, 0, true);
+ timePartitionManager.registerTimePartitionInfo(timePartitionInfo);
+
+ timePartitionManager.updateAfterFlushing(new DataRegionId(1), 0L, 100L, 100L, false);
+
+ TimePartitionInfo timePartitionInfo1 =
+ timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L);
+
+ assertTrue(timePartitionInfo1.isLatestPartition);
+ assertEquals(timePartitionInfo1.lastSystemFlushTime, 100L);
+ assertEquals(timePartitionInfo1.memSize, 100);
+ assertFalse(timePartitionInfo1.isActive);
+
+ timePartitionManager.updateAfterOpeningTsFileProcessor(new DataRegionId(1), 0L);
+ TimePartitionInfo timePartitionInfo2 =
+ timePartitionManager.getTimePartitionInfo(new DataRegionId(1), 0L);
+ assertTrue(timePartitionInfo2.isActive);
+ }
+
+ @Test
+ public void testMemoryControl() {
+ for (int i = 0; i < 5; i++) {
+ TimePartitionInfo timePartitionInfo =
+ new TimePartitionInfo(new DataRegionId(i), 0L, true, Long.MAX_VALUE, 0, true);
+ timePartitionManager.registerTimePartitionInfo(timePartitionInfo);
+ }
+ timePartitionManager.updateAfterFlushing(new DataRegionId(0), 0L, 100L, 20L, false);
+ timePartitionManager.updateAfterFlushing(new DataRegionId(1), 0L, 101L, 20L, true);
+ timePartitionManager.updateAfterFlushing(new DataRegionId(2), 0L, 102L, 20L, false);
+ timePartitionManager.updateAfterFlushing(new DataRegionId(3), 0L, 103L, 20L, false);
+ timePartitionManager.updateAfterFlushing(new DataRegionId(4), 0L, 104L, 20L, true);
+ timePartitionManager.registerTimePartitionInfo(
+ new TimePartitionInfo(new DataRegionId(0), 1L, true, Long.MAX_VALUE, 0, true));
+
+ timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 105L, 20L, true);
+
+ Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(0), 0L));
+
+ timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 106L, 40L, true);
+
+ Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(2), 0L));
+
+ timePartitionManager.updateAfterFlushing(new DataRegionId(0), 1L, 107L, 60L, true);
+
+ Assert.assertNull(timePartitionManager.getTimePartitionInfo(new DataRegionId(3), 0L));
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index 6b006f8966..ca944ce80c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -110,7 +110,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -169,7 +169,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -256,7 +256,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -299,7 +299,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -335,7 +335,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -371,7 +371,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> true,
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
deleted file mode 100644
index 4eab19cb67..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.idtable;
-
-// public class IDTableFlushTimeTest {
-// private PlanExecutor executor = new PlanExecutor();
-//
-// private final Planner processor = new Planner();
-//
-// private boolean isEnableIDTable = false;
-//
-// private String originalDeviceIDTransformationMethod = null;
-//
-// private boolean isEnableIDTableLogFile = false;
-//
-// public IDTableFlushTimeTest() throws QueryProcessException {}
-//
-// @Before
-// public void before() {
-// IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
-// isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
-// originalDeviceIDTransformationMethod =
-// IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
-// isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
-//
-// IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
-// IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
-// IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
-// EnvironmentUtils.envSetUp();
-// }
-//
-// @After
-// public void clean() throws IOException, StorageEngineException {
-// IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
-// IoTDBDescriptor.getInstance()
-// .getConfig()
-// .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
-// IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
-// EnvironmentUtils.cleanEnv();
-// }
-//
-// @Test
-// public void testSequenceInsert()
-// throws MetadataException, QueryProcessException, StorageEngineException {
-// insertData(0);
-// insertData(10);
-// PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-// executor.processNonQuery(flushPlan);
-//
-// insertData(20);
-//
-// DataRegion storageGroupProcessor =
-// StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-// assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
-// assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
-// }
-//
-// @Test
-// public void testUnSequenceInsert()
-// throws MetadataException, QueryProcessException, StorageEngineException {
-// insertData(100);
-// PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-// executor.processNonQuery(flushPlan);
-//
-// insertData(20);
-//
-// DataRegion storageGroupProcessor =
-// StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-// assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
-// assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
-// }
-//
-// @Test
-// public void testSequenceAndUnSequenceInsert()
-// throws MetadataException, QueryProcessException, StorageEngineException {
-// // sequence
-// insertData(100);
-// PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-// executor.processNonQuery(flushPlan);
-//
-// // sequence
-// insertData(120);
-// executor.processNonQuery(flushPlan);
-//
-// // unsequence
-// insertData(20);
-// // sequence
-// insertData(130);
-// executor.processNonQuery(flushPlan);
-//
-// // sequence
-// insertData(150);
-// // unsequence
-// insertData(90);
-//
-// DataRegion storageGroupProcessor =
-// StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-// assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
-// assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
-// assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
-// assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
-// }
-//
-// @Test
-// public void testDeletePartition()
-// throws MetadataException, QueryProcessException, StorageEngineException {
-// insertData(100);
-// PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
-// executor.processNonQuery(flushPlan);
-// insertData(20);
-// insertData(120);
-//
-// DataRegion storageGroupProcessor =
-// StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
-//
-// assertEquals(
-// 103L, storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
-// assertEquals(
-// 123L, storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
-// assertEquals(
-// 103L,
-// storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
-//
-// // delete time partition
-// Set<Long> deletedPartition = new HashSet<>();
-// deletedPartition.add(0L);
-// DeletePartitionPlan deletePartitionPlan =
-// new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
-// executor.processNonQuery(deletePartitionPlan);
-//
-// assertEquals(
-// Long.MIN_VALUE,
-// storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
-// assertEquals(
-// Long.MIN_VALUE,
-// storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
-// assertEquals(
-// 123L,
-// storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
-// }
-//
-// private void insertData(long initTime) throws IllegalPathException, QueryProcessException {
-//
-// long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
-// List<Integer> dataTypes = new ArrayList<>();
-// dataTypes.add(TSDataType.DOUBLE.ordinal());
-// dataTypes.add(TSDataType.FLOAT.ordinal());
-// dataTypes.add(TSDataType.INT64.ordinal());
-// dataTypes.add(TSDataType.INT32.ordinal());
-// dataTypes.add(TSDataType.BOOLEAN.ordinal());
-// dataTypes.add(TSDataType.TEXT.ordinal());
-//
-// Object[] columns = new Object[6];
-// columns[0] = new double[4];
-// columns[1] = new float[4];
-// columns[2] = new long[4];
-// columns[3] = new int[4];
-// columns[4] = new boolean[4];
-// columns[5] = new Binary[4];
-//
-// for (int r = 0; r < 4; r++) {
-// ((double[]) columns[0])[r] = 10.0 + r;
-// ((float[]) columns[1])[r] = 20 + r;
-// ((long[]) columns[2])[r] = 100000 + r;
-// ((int[]) columns[3])[r] = 1000 + r;
-// ((boolean[]) columns[4])[r] = false;
-// ((Binary[]) columns[5])[r] = new Binary("mm" + r);
-// }
-//
-// InsertTabletPlan tabletPlan =
-// new InsertTabletPlan(
-// new PartialPath("root.isp.d1"),
-// new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
-// dataTypes);
-// tabletPlan.setTimes(times);
-// tabletPlan.setColumns(columns);
-// tabletPlan.setRowCount(times.length);
-//
-// PlanExecutor executor = new PlanExecutor();
-// executor.insertTablet(tabletPlan);
-// }
-// }