You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/06/04 10:52:38 UTC
[iotdb] branch rel/0.11 updated: fix compaction ttl bug (#3341)
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 5d0d5ac fix compaction ttl bug (#3341)
5d0d5ac is described below
commit 5d0d5acd9db9048a4ec095c7503615bda183d59a
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Jun 4 18:52:20 2021 +0800
fix compaction ttl bug (#3341)
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../level/LevelCompactionTsFileManagement.java | 90 ++++++--
.../engine/compaction/utils/CompactionUtils.java | 250 +++++++++++----------
2 files changed, 196 insertions(+), 144 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index c3281e3..9f00b4b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -124,7 +124,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.debug("{} [compaction] merge starts to delete real file", storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
deleteLevelFile(mergeTsFile);
- logger.info(
+ logger.debug(
"{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
}
}
@@ -570,20 +570,22 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (enableUnseqCompaction
&& unseqLevelNum <= 1
&& forkedUnSequenceTsFileResources.get(0).size() > 0) {
- isMergeExecutedInCurrentTask = isMergeExecutedInCurrentTask ||
- merge(
- isForceFullMerge,
- getTsFileListByTimePartition(true, timePartition),
- forkedUnSequenceTsFileResources.get(0),
- Long.MAX_VALUE);
+ isMergeExecutedInCurrentTask =
+ isMergeExecutedInCurrentTask
+ || merge(
+ isForceFullMerge,
+ getTsFileListByTimePartition(true, timePartition),
+ forkedUnSequenceTsFileResources.get(0),
+ Long.MAX_VALUE);
} else {
- isMergeExecutedInCurrentTask = isMergeExecutedInCurrentTask ||
- merge(
- forkedUnSequenceTsFileResources,
- false,
- timePartition,
- unseqLevelNum,
- unseqFileNumInEachLevel);
+ isMergeExecutedInCurrentTask =
+ isMergeExecutedInCurrentTask
+ || merge(
+ forkedUnSequenceTsFileResources,
+ false,
+ timePartition,
+ unseqLevelNum,
+ unseqFileNumInEachLevel);
}
}
@@ -608,6 +610,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
long startTimeMillis = System.currentTimeMillis();
// whether execute merge chunk in the loop below
boolean isMergeExecutedInCurrentTask = false;
+ CompactionLogger compactionLogger = null;
try {
logger.info("{} start to filter compaction condition", storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
@@ -620,16 +623,17 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
// do not merge current unseq file level to upper level and just merge all of them to
// seq file
isSeqMerging = false;
- isMergeExecutedInCurrentTask = merge(
- isForceFullMerge,
- getTsFileListByTimePartition(true, timePartition),
- mergeResources.get(i),
- Long.MAX_VALUE);
+ isMergeExecutedInCurrentTask =
+ merge(
+ isForceFullMerge,
+ getTsFileListByTimePartition(true, timePartition),
+ mergeResources.get(i),
+ Long.MAX_VALUE);
} else {
- CompactionLogger compactionLogger =
- new CompactionLogger(storageGroupDir, storageGroupName);
+ compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : mergeResources.get(i)) {
+ mergeResource.setMerging(true);
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
File newLevelFile =
@@ -691,6 +695,14 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
}
} catch (Exception e) {
+ if (compactionLogger != null) {
+ try {
+ compactionLogger.close();
+ } catch (IOException ioException) {
+ logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
+ }
+ }
+ restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
isSeqMerging = false;
@@ -704,6 +716,42 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return isMergeExecutedInCurrentTask;
}
+ /** restore the files back to the status before the compaction task is submitted */
+ private void restoreCompaction() {
+ File logFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+ try {
+ if (logFile.exists()) {
+ CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
+ logAnalyzer.analyze();
+ String targetFilePath = logAnalyzer.getTargetFile();
+ List<String> sourceFileList = logAnalyzer.getSourceFiles();
+ boolean isSeq = logAnalyzer.isSeq();
+ for (String file : sourceFileList) {
+ TsFileResource fileResource = getTsFileResource(file, isSeq);
+ fileResource.setMerging(false);
+ }
+ if (targetFilePath != null) {
+ File targetFile = new File(targetFilePath);
+ if (targetFile.exists()) {
+ targetFile.delete();
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.error("restore compaction failed", e);
+ } finally {
+ if (logFile.exists()) {
+ try {
+ Files.delete(logFile.toPath());
+ } catch (IOException e) {
+ logger.error("delete compaction log file error ", e);
+ }
+ }
+ }
+ }
+
/** if level < maxLevel-1, the file need compaction else, the file can be merged later */
private File createNewTsFileName(File sourceFile, int level) {
String path = sourceFile.getPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 3ca13fe..4662569 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -254,125 +254,93 @@ public class CompactionUtils {
boolean sequence,
List<Modification> modifications)
throws IOException, IllegalPathException {
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, List<Modification>> modificationCache = new HashMap<>();
- RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
- Set<String> tsFileDevicesMap =
- getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
- for (String device : tsFileDevicesMap) {
- if (devices.contains(device)) {
- continue;
- }
- long maxVersion = Long.MIN_VALUE;
- writer.startChunkGroup(device);
- Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
- chunkMetadataListIteratorCache =
- new TreeMap<>(
- (o1, o2) ->
- TsFileManagement.compareFileName(
- new File(o1.getFileName()), new File(o2.getFileName())));
- for (TsFileResource tsFileResource : tsFileResources) {
- TsFileSequenceReader reader =
- buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
- if (reader == null) {
- throw new IOException();
- }
- Iterator<Map<String, List<ChunkMetadata>>> iterator =
- reader.getMeasurementChunkMetadataListMapIterator(device);
- chunkMetadataListIteratorCache.put(reader, iterator);
- chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
- }
- while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
- String lastSensor = null;
- Set<String> allSensors = new HashSet<>();
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.size() <= 0) {
- if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
- sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
- chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
- } else {
- continue;
- }
- }
- // get the min last sensor in the current chunkMetadata cache list for merge
- String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
- if (lastSensor == null) {
- lastSensor = maxSensor;
- } else {
- if (maxSensor.compareTo(lastSensor) < 0) {
- lastSensor = maxSensor;
- }
- }
- // get all sensor used later
- allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ try {
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+ RateLimiter compactionWriteRateLimiter =
+ MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ Set<String> tsFileDevicesMap =
+ getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap, storageGroup);
+ for (String device : tsFileDevicesMap) {
+ if (devices.contains(device)) {
+ continue;
}
-
- for (String sensor : allSensors) {
- if (sensor.compareTo(lastSensor) <= 0) {
- Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+ long maxVersion = Long.MIN_VALUE;
+ writer.startChunkGroup(device);
+ Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> chunkMetadataListCacheForMerge =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new File(o2.getFileName())));
+ Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>>
+ chunkMetadataListIteratorCache =
new TreeMap<>(
(o1, o2) ->
TsFileManagement.compareFileName(
new File(o1.getFileName()), new File(o2.getFileName())));
- // find all chunkMetadata of a sensor
- for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
- chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
- TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
- Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
- chunkMetadataListCacheForMergeEntry.getValue();
- if (sensorChunkMetadataListMap.containsKey(sensor)) {
- readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
- sensorChunkMetadataListMap.remove(sensor);
+ for (TsFileResource tsFileResource : tsFileResources) {
+ TsFileSequenceReader reader =
+ buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+ if (reader == null) {
+ throw new IOException();
+ }
+ Iterator<Map<String, List<ChunkMetadata>>> iterator =
+ reader.getMeasurementChunkMetadataListMapIterator(device);
+ chunkMetadataListIteratorCache.put(reader, iterator);
+ chunkMetadataListCacheForMerge.put(reader, new TreeMap<>());
+ }
+ while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
+ String lastSensor = null;
+ Set<String> allSensors = new HashSet<>();
+ for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.size() <= 0) {
+ if (chunkMetadataListIteratorCache.get(reader).hasNext()) {
+ sensorChunkMetadataListMap = chunkMetadataListIteratorCache.get(reader).next();
+ chunkMetadataListCacheForMerge.put(reader, sensorChunkMetadataListMap);
+ } else {
+ continue;
}
}
- Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
- sensorReaderChunkMetadataListEntry =
- new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
- if (!sequence) {
- writeByDeserializeMerge(
- maxVersion,
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer,
- modificationCache,
- modifications);
+ // get the min last sensor in the current chunkMetadata cache list for merge
+ String maxSensor = Collections.max(sensorChunkMetadataListMap.keySet());
+ if (lastSensor == null) {
+ lastSensor = maxSensor;
} else {
- boolean isPageEnoughLarge = true;
- for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
- for (ChunkMetadata chunkMetadata : chunkMetadatas) {
- if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
- isPageEnoughLarge = false;
- break;
- }
+ if (maxSensor.compareTo(lastSensor) < 0) {
+ lastSensor = maxSensor;
+ }
+ }
+ // get all sensor used later
+ allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ }
+
+ for (String sensor : allSensors) {
+ if (sensor.compareTo(lastSensor) <= 0) {
+ Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
+ new TreeMap<>(
+ (o1, o2) ->
+ TsFileManagement.compareFileName(
+ new File(o1.getFileName()), new File(o2.getFileName())));
+ // find all chunkMetadata of a sensor
+ for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
+ chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
+ TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
+ Map<String, List<ChunkMetadata>> sensorChunkMetadataListMap =
+ chunkMetadataListCacheForMergeEntry.getValue();
+ if (sensorChunkMetadataListMap.containsKey(sensor)) {
+ readerChunkMetadataListMap.put(reader, sensorChunkMetadataListMap.get(sensor));
+ sensorChunkMetadataListMap.remove(sensor);
}
}
- if (isPageEnoughLarge) {
- logger.debug("{} [Compaction] page enough large, use append merge", storageGroup);
- // append page in chunks, so we do not have to deserialize a chunk
- writeByAppendMerge(
- maxVersion,
- device,
- compactionWriteRateLimiter,
- sensorReaderChunkMetadataListEntry,
- targetResource,
- writer,
- modificationCache,
- modifications);
- } else {
- logger.debug("{} [Compaction] page too small, use deserialize merge", storageGroup);
- // we have to deserialize chunks to merge pages
+ Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>>
+ sensorReaderChunkMetadataListEntry =
+ new DefaultMapEntry<>(sensor, readerChunkMetadataListMap);
+ if (!sequence) {
writeByDeserializeMerge(
maxVersion,
device,
@@ -382,29 +350,65 @@ public class CompactionUtils {
writer,
modificationCache,
modifications);
+ } else {
+ boolean isPageEnoughLarge = true;
+ for (List<ChunkMetadata> chunkMetadatas : readerChunkMetadataListMap.values()) {
+ for (ChunkMetadata chunkMetadata : chunkMetadatas) {
+ if (chunkMetadata.getNumOfPoints() < MERGE_PAGE_POINT_NUM) {
+ isPageEnoughLarge = false;
+ break;
+ }
+ }
+ }
+ if (isPageEnoughLarge) {
+ logger.debug("{} [Compaction] page enough large, use append merge", storageGroup);
+ // append page in chunks, so we do not have to deserialize a chunk
+ writeByAppendMerge(
+ maxVersion,
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer,
+ modificationCache,
+ modifications);
+ } else {
+ logger.debug(
+ "{} [Compaction] page too small, use deserialize merge", storageGroup);
+ // we have to deserialize chunks to merge pages
+ writeByDeserializeMerge(
+ maxVersion,
+ device,
+ compactionWriteRateLimiter,
+ sensorReaderChunkMetadataListEntry,
+ targetResource,
+ writer,
+ modificationCache,
+ modifications);
+ }
}
}
}
}
+ writer.endChunkGroup();
+ writer.writeVersion(maxVersion);
+ if (compactionLogger != null) {
+ compactionLogger.logDevice(device, writer.getPos());
+ }
}
- writer.endChunkGroup();
- writer.writeVersion(maxVersion);
- if (compactionLogger != null) {
- compactionLogger.logDevice(device, writer.getPos());
+ Set<Long> historicalVersions = new HashSet<>();
+ for (TsFileResource tsFileResource : tsFileResources) {
+ historicalVersions.addAll(tsFileResource.getHistoricalVersions());
+ }
+ targetResource.setHistoricalVersions(historicalVersions);
+ targetResource.serialize();
+ writer.endFile();
+ targetResource.close();
+ } finally {
+ for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
+ reader.close();
}
}
-
- for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
- reader.close();
- }
- Set<Long> historicalVersions = new HashSet<>();
- for (TsFileResource tsFileResource : tsFileResources) {
- historicalVersions.addAll(tsFileResource.getHistoricalVersions());
- }
- targetResource.setHistoricalVersions(historicalVersions);
- targetResource.serialize();
- writer.endFile();
- targetResource.close();
}
private static TsFileSequenceReader buildReaderFromTsFileResource(