You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/10 09:10:25 UTC
[iotdb] 01/01: fix compaction ttl bug
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_compaction_ttl
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 21f566b07660c58ead6162f4e6dcc3ba92a5bd86
Author: qiaojialin <64...@qq.com>
AuthorDate: Thu Jun 10 17:09:45 2021 +0800
fix compaction ttl bug
---
.../level/LevelCompactionTsFileManagement.java | 7 +
.../engine/compaction/utils/CompactionUtils.java | 284 +++++++++++----------
2 files changed, 151 insertions(+), 140 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..afefcd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -660,6 +660,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : mergeResources.get(i)) {
+ mergeResource.setMerging(true);
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
File newLevelFile =
@@ -844,6 +845,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
logAnalyzer.analyze();
String targetFilePath = logAnalyzer.getTargetFile();
+ List<String> sourceFileList = logAnalyzer.getSourceFiles();
+ boolean isSeq = logAnalyzer.isSeq();
+ for (String file : sourceFileList) {
+ TsFileResource fileResource = getTsFileResource(file, isSeq);
+ fileResource.setMerging(false);
+ }
if (targetFilePath != null) {
File targetFile = new File(targetFilePath);
if (targetFile.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index a6aa68c..22705af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -260,142 +260,92 @@ public class CompactionUtils {
boolean sequence,
List<Modification> modifications)
throws IOException, IllegalPathException {
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
- Map<String, List<Modification>> modificationCache = new HashMap<>();
- RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
- Set<String> tsFileDevicesMap =
- getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
- for (String device : tsFileDevicesMap) {
- if (devices.contains(device)) {
- continue;
- }
- writer.startChunkGroup(device);
- Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
- chunkMetadataListIteratorCache =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- for (TsFileResource tsFileResource : tsFileResources) {
- TsFileSequenceReader reader =
- buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
- if (reader == null) {
- throw new IOException();
+ try {
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+ Map<String, List<Modification>> modificationCache = new HashMap<>();
+ RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE()
+ .getMergeWriteRateLimiter();
+ Set<String> tsFileDevicesMap =
+ getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
+ for (String device : tsFileDevicesMap) {
+ if (devices.contains(device)) {
+ continue;
}
- Iterator<Map<String, List<ChunkMetadata>>> iterator =
- reader.getMeasurementChunkMetadataListMapIterator(device);
- chunkMetadataListIteratorCache.put(reader, iterator);
- chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
- }
- while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
- String lastSensor = null;
- Set<String> allSensors = new HashSet<>();
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.size() <= 0) {
- if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
- sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
- chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
- } else {
- continue;
- }
- }
- // get the min last sensor in the current chunkMetadata cache list for merge
- String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
- if (lastSensor == null) {
- lastSensor = maxSensor;
- } else {
- if (maxSensor.compareTo(lastSensor) < 0) {
- lastSensor = maxSensor;
- }
+ writer.startChunkGroup(device);
+ Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new File(o2.getFileName())));
+ Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+ chunkMetadataListIteratorCache =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new File(o2.getFileName())));
+ for (TsFileResource tsFileResource : tsFileResources) {
+ TsFileSequenceReader reader =
+ buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+ if (reader == null) {
+ throw new IOException();
}
- // get all sensor used later
- allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ Iterator<Map<String, List<ChunkMetadata>>> iterator =
+ reader.getMeasurementChunkMetadataListMapIterator(device);
+ chunkMetadataListIteratorCache.put(reader, iterator);
+ chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
}
-
- for (String sensor : allSensors) {
- if (sensor.compareTo(lastSensor) <= 0) {
- Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- // find all chunkMetadata of a sensor
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.containsKey(sensor)) {
- readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
- sensorChunkMetadataListMap.remove(sensor);
+ while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+ String lastSensor = null;
+ Set<String> allSensors = new HashSet<>();
+ for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.size() <= 0) {
+ if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+ sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
+ chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
+ } else {
+ continue;
}
}
- Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
- sensorReaderChunkMetadataListEntry =
- new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
- if (!sequence) {
- writeByDeserializePageMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer,
- modificationCache,
- modifications);
+ // get the min last sensor in the current chunkMetadata cache list for merge
+ String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
+ if (lastSensor == null) {
+ lastSensor = maxSensor;
} else {
- boolean isChunkEnoughLarge = true;
- boolean isPageEnoughLarge = true;
- for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
- for (ChunkMetadata chunkMetadata : chunkMetadatas) {
- if (chunkMetadata.getNumOfPoints()
- < IoTDBDescriptor.getInstance()
- .getConfig()
- .getMergePagePointNumberThreshold()) {
- isPageEnoughLarge = false;
- }
- if (chunkMetadata.getNumOfPoints()
- < IoTDBDescriptor.getInstance()
- .getConfig()
- .getMergeChunkPointNumberThreshold()) {
- isChunkEnoughLarge = false;
- }
+ if (maxSensor.compareTo(lastSensor) < 0) {
+ lastSensor = maxSensor;
+ }
+ }
+ // get all sensor used later
+ allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ }
+
+ for (String sensor : allSensors) {
+ if (sensor.compareTo(lastSensor) <= 0) {
+ Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new File(o2.getFileName())));
+ // find all chunkMetadata of a sensor
+ for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.containsKey(sensor)) {
+ readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
+ sensorChunkMetadataListMap.remove(sensor);
}
}
- // if a chunk is large enough, it's page must be large enough too
- if (isChunkEnoughLarge) {
- logger.debug(
- "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
- // append page in chunks, so we do not have to deserialize a chunk
- writeByAppendChunkMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer);
- } else if (isPageEnoughLarge) {
- logger.debug(
- "{} [Compaction] page enough large, use append page merge", storageGroup);
- // append page in chunks, so we do not have to deserialize a chunk
- writeByAppendPageMerge(
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer);
- } else {
- logger.debug(
- "{} [Compaction] page too small, use deserialize page merge", storageGroup);
- // we have to deserialize chunks to merge pages
+ Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+ sensorReaderChunkMetadataListEntry =
+ new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
+ if (!sequence) {
writeByDeserializePageMerge(
device,
compactionWriteRateLimiter,
@@ -404,27 +354,81 @@ public class CompactionUtils {
writer,
modificationCache,
modifications);
+ } else {
+ boolean isChunkEnoughLarge = true;
+ boolean isPageEnoughLarge = true;
+ for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergePagePointNumberThreshold()) {
+ isPageEnoughLarge = false;
+ }
+ if (chunkMetadata.getNumOfPoints()
+ < IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getMergeChunkPointNumberThreshold()) {
+ isChunkEnoughLarge = false;
+ }
+ }
+ }
+ // if a chunk is large enough, it's page must be large enough too
+ if (isChunkEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] chunk enough large, use append chunk merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a chunk
+ writeByAppendChunkMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer);
+ } else if (isPageEnoughLarge) {
+ logger.debug(
+ "{} [Compaction] page enough large, use append page merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a chunk
+ writeByAppendPageMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer);
+ } else {
+ logger.debug(
+ "{} [Compaction] page too small, use deserialize page merge", storageGroup);
+ // we have to deserialize chunks to merge pages
+ writeByDeserializePageMerge(
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer,
+ modificationCache,
+ modifications);
+ }
}
}
}
}
+ writer.endChunkGroup();
+ if (compactionLogger != null) {
+ compactionLogger.logDevice(device, writer.getPos());
+ }
}
- writer.endChunkGroup();
- if (compactionLogger != null) {
- compactionLogger.logDevice(device, writer.getPos());
- }
- }
- for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
- reader.close();
- }
+ for (TsFileResource tsFileResource : tsFileResources) {
+ targetResource.updatePlanIndexes(tsFileResource);
+ }
+ targetResource.serialize();
+ writer.endFile();
+ targetResource.close();
- for (TsFileResource tsFileResource : tsFileResources) {
- targetResource.updatePlanIndexes(tsFileResource);
+ } finally {
+ for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+ reader.close();
+ }
}
- targetResource.serialize();
- writer.endFile();
- targetResource.close();
}
private static TsFileSequenceReader buildReaderFromTsFileResource(