You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/16 10:59:36 UTC
[iotdb] branch rel/0.11 updated: [To 0.11] fix unseq compaction
loss data bug after delete operation (#2801)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 052b232 [To 0.11] fix unseq compaction loss data bug after delete operation (#2801)
052b232 is described below
commit 052b232a7de30ef771a06d89af667ee71f247377
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Mar 16 18:59:12 2021 +0800
[To 0.11] fix unseq compaction loss data bug after delete operation (#2801)
---
.../db/engine/merge/manage/MergeResource.java | 56 ++++--
.../iotdb/db/engine/merge/task/MergeFileTask.java | 114 +++++++----
.../db/engine/merge/task/MergeMultiChunkTask.java | 211 ++++++++++++++-------
3 files changed, 254 insertions(+), 127 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d304d4c..bb4d069 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -60,7 +61,10 @@ public class MergeResource {
private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
- private Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>(); //is this too waste?
+ private Map<TsFileResource, Map<String, Pair<Long, Long>>> startEndTimeCache =
+ new HashMap<>(); // pair<startTime, endTime>
+ private Map<PartialPath, MeasurementSchema> measurementSchemaMap =
+ new HashMap<>(); // is this too waste?
private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>();
private long timeLowerBound = Long.MIN_VALUE;
@@ -68,10 +72,8 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;
public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
- this.seqFiles = seqFiles.stream().filter(this::filterResource)
- .collect(Collectors.toList());
- this.unseqFiles = unseqFiles.stream().filter(this::filterResource)
- .collect(Collectors.toList());
+ this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+ this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
}
/** If returns true, it means to participate in the merge */
@@ -81,13 +83,11 @@ public class MergeResource {
&& (!res.isClosed() || res.stillLives(timeLowerBound));
}
- public MergeResource(Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
- long timeLowerBound) {
+ public MergeResource(
+ Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, long timeLowerBound) {
this.timeLowerBound = timeLowerBound;
- this.seqFiles =
- seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
- this.unseqFiles =
- unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+ this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+ this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
}
public void clear() throws IOException {
@@ -118,8 +118,9 @@ public class MergeResource {
public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
if (writer == null) {
- writer = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory()
- .getFile(resource.getTsFilePath() + MERGE_SUFFIX));
+ writer =
+ new RestorableTsFileIOWriter(
+ FSFactoryProducer.getFSFactory().getFile(resource.getTsFilePath() + MERGE_SUFFIX));
fileWriterCache.put(resource, writer);
}
return writer;
@@ -183,8 +184,9 @@ public class MergeResource {
*/
public List<Modification> getModifications(TsFileResource tsFileResource, PartialPath path) {
// copy from TsFileResource so queries are not affected
- List<Modification> modifications = modificationCache.computeIfAbsent(tsFileResource,
- resource -> new LinkedList<>(resource.getModFile().getModifications()));
+ List<Modification> modifications =
+ modificationCache.computeIfAbsent(
+ tsFileResource, resource -> new LinkedList<>(resource.getModFile().getModifications()));
List<Modification> pathModifications = new ArrayList<>();
Iterator<Modification> modificationIterator = modifications.iterator();
while (modificationIterator.hasNext()) {
@@ -234,8 +236,7 @@ public class MergeResource {
return unseqFiles;
}
- public void setUnseqFiles(
- List<TsFileResource> unseqFiles) {
+ public void setUnseqFiles(List<TsFileResource> unseqFiles) {
this.unseqFiles = unseqFiles;
}
@@ -265,4 +266,25 @@ public class MergeResource {
this.chunkWriterCache.clear();
}
+ public void updateStartTime(TsFileResource tsFileResource, String device, long startTime) {
+ Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
+ startEndTimeCache.computeIfAbsent(tsFileResource, k -> new HashMap<>());
+ Pair<Long, Long> startEndTimePair =
+ deviceStartEndTimePairMap.computeIfAbsent(
+ device, k -> new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
+ startEndTimePair.left = startEndTimePair.left > startTime ? startTime : startEndTimePair.left;
+ }
+
+ public void updateEndTime(TsFileResource tsFileResource, String device, long endTime) {
+ Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
+ startEndTimeCache.computeIfAbsent(tsFileResource, k -> new HashMap<>());
+ Pair<Long, Long> startEndTimePair =
+ deviceStartEndTimePairMap.computeIfAbsent(
+ device, k -> new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
+ startEndTimePair.right = startEndTimePair.right < endTime ? endTime : startEndTimePair.right;
+ }
+
+ public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
+ return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 2685c1d..92914a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -67,8 +68,12 @@ class MergeFileTask {
private int currentMergeIndex;
private String currMergeFile;
- MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
- MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
+ MergeFileTask(
+ String taskName,
+ MergeContext context,
+ MergeLogger mergeLogger,
+ MergeResource resource,
+ List<TsFileResource> unmergedSeqFiles) {
this.taskName = taskName;
this.context = context;
this.mergeLogger = mergeLogger;
@@ -93,16 +98,24 @@ class MergeFileTask {
if (mergedChunkNum >= unmergedChunkNum) {
// move the unmerged data to the new file
if (logger.isInfoEnabled()) {
- logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
- + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+ logger.info(
+ "{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+ + "unmerged chunks",
+ taskName,
+ seqFile.getTsFile().getName(),
+ mergedChunkNum,
unmergedChunkNum);
}
moveUnmergedToNew(seqFile);
} else {
// move the merged data to the old file
if (logger.isInfoEnabled()) {
- logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
- + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+ logger.info(
+ "{} moving merged data of {} to the old file {} merged chunks, {} "
+ + "unmerged chunks",
+ taskName,
+ seqFile.getTsFile().getName(),
+ mergedChunkNum,
unmergedChunkNum);
}
moveMergedToOld(seqFile);
@@ -116,22 +129,27 @@ class MergeFileTask {
logProgress();
}
if (logger.isInfoEnabled()) {
- logger.info("{} has merged all files after {}ms", taskName,
- System.currentTimeMillis() - startTime);
+ logger.info(
+ "{} has merged all files after {}ms", taskName, System.currentTimeMillis() - startTime);
}
mergeLogger.logMergeEnd();
}
private void logProgress() {
if (logger.isDebugEnabled()) {
- logger.debug("{} has merged {}, processed {}/{} files", taskName, currMergeFile,
- currentMergeIndex + 1, unmergedFiles.size());
+ logger.debug(
+ "{} has merged {}, processed {}/{} files",
+ taskName,
+ currMergeFile,
+ currentMergeIndex + 1,
+ unmergedFiles.size());
}
}
public String getProgress() {
- return String.format("Merging %s, processed %d/%d files", currMergeFile,
- currentMergeIndex + 1, unmergedFiles.size());
+ return String.format(
+ "Merging %s, processed %d/%d files",
+ currMergeFile, currentMergeIndex + 1, unmergedFiles.size());
}
private void moveMergedToOld(TsFileResource seqFile) throws IOException {
@@ -150,8 +168,7 @@ class MergeFileTask {
TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
// filter the chunks that have been merged
- oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile))
- );
+ oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile)));
RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
newFileWriter.close();
@@ -160,11 +177,11 @@ class MergeFileTask {
Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
newFileWriter.getDeviceChunkMetadataMap();
if (logger.isDebugEnabled()) {
- logger.debug("{} find {} merged chunk groups", taskName,
- chunkMetadataListInChunkGroups.size());
+ logger.debug(
+ "{} find {} merged chunk groups", taskName, chunkMetadataListInChunkGroups.size());
}
- for (Map.Entry<String, List<ChunkMetadata>> entry : chunkMetadataListInChunkGroups
- .entrySet()) {
+ for (Map.Entry<String, List<ChunkMetadata>> entry :
+ chunkMetadataListInChunkGroups.entrySet()) {
String deviceId = entry.getKey();
List<ChunkMetadata> chunkMetadataList = entry.getValue();
writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
@@ -193,15 +210,26 @@ class MergeFileTask {
}
private void updateStartTimeAndEndTime(TsFileResource seqFile, TsFileIOWriter fileWriter) {
- //TODO change to get one timeseries block each time
- for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataEntry : fileWriter
- .getDeviceChunkMetadataMap().entrySet()) {
- String device = deviceChunkMetadataEntry.getKey();
- for (ChunkMetadata chunkMetadata : deviceChunkMetadataEntry.getValue()) {
- seqFile.updateStartTime(device, chunkMetadata.getStartTime());
- seqFile.updateEndTime(device, chunkMetadata.getEndTime());
+ // TODO change to get one timeseries block each time
+ Map<String, List<ChunkMetadata>> deviceChunkMetadataListMap =
+ fileWriter.getDeviceChunkMetadataMap();
+ for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataListEntry :
+ deviceChunkMetadataListMap.entrySet()) {
+ String device = deviceChunkMetadataListEntry.getKey();
+ for (ChunkMetadata chunkMetadata : deviceChunkMetadataListEntry.getValue()) {
+ resource.updateStartTime(seqFile, device, chunkMetadata.getStartTime());
+ resource.updateEndTime(seqFile, device, chunkMetadata.getEndTime());
}
}
+ // update all device start time and end time of the resource
+ Map<String, Pair<Long, Long>> deviceStartEndTimePairMap = resource.getStartEndTime(seqFile);
+ for (Entry<String, Pair<Long, Long>> deviceStartEndTimePairEntry :
+ deviceStartEndTimePairMap.entrySet()) {
+ String device = deviceStartEndTimePairEntry.getKey();
+ Pair<Long, Long> startEndTimePair = deviceStartEndTimePairEntry.getValue();
+ seqFile.putStartTime(device, startEndTimePair.left);
+ seqFile.putEndTime(device, startEndTimePair.right);
+ }
}
/**
@@ -209,8 +237,8 @@ class MergeFileTask {
* aborted.
*/
private void restoreOldFile(TsFileResource seqFile) throws IOException {
- RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
- seqFile.getTsFile());
+ RestorableTsFileIOWriter oldFileRecoverWriter =
+ new RestorableTsFileIOWriter(seqFile.getTsFile());
if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
oldFileRecoverWriter.endFile();
} else {
@@ -218,15 +246,13 @@ class MergeFileTask {
}
}
- /**
- * Open an appending writer for an old seq file so we can add new chunks to it.
- */
+ /** Open an appending writer for an old seq file so we can add new chunks to it. */
private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
TsFileIOWriter oldFileWriter;
try {
oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
- mergeLogger.logFileMergeStart(seqFile.getTsFile(),
- ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+ mergeLogger.logFileMergeStart(
+ seqFile.getTsFile(), ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
} catch (TsFileNotCompleteException e) {
@@ -248,8 +274,11 @@ class MergeFileTask {
}
}
- private void writeMergedChunkGroup(List<ChunkMetadata> chunkMetadataList, String device,
- TsFileSequenceReader reader, TsFileIOWriter fileWriter)
+ private void writeMergedChunkGroup(
+ List<ChunkMetadata> chunkMetadataList,
+ String device,
+ TsFileSequenceReader reader,
+ TsFileIOWriter fileWriter)
throws IOException {
fileWriter.startChunkGroup(device);
long maxVersion = 0;
@@ -288,8 +317,9 @@ class MergeFileTask {
}
fileWriter.startChunkGroup(path.getDevice());
- long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
- resource.getFileReader(seqFile), fileWriter);
+ long maxVersion =
+ writeUnmergedChunks(
+ chunkStartTimes, chunkMetadataList, resource.getFileReader(seqFile), fileWriter);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -329,9 +359,12 @@ class MergeFileTask {
}
}
- private long writeUnmergedChunks(List<Long> chunkStartTimes,
- List<ChunkMetadata> chunkMetadataList, TsFileSequenceReader reader,
- RestorableTsFileIOWriter fileWriter) throws IOException {
+ private long writeUnmergedChunks(
+ List<Long> chunkStartTimes,
+ List<ChunkMetadata> chunkMetadataList,
+ TsFileSequenceReader reader,
+ RestorableTsFileIOWriter fileWriter)
+ throws IOException {
long maxVersion = 0;
int chunkIdx = 0;
for (Long startTime : chunkStartTimes) {
@@ -353,5 +386,4 @@ class MergeFileTask {
}
return maxVersion;
}
-
-}
\ No newline at end of file
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index a57bcf7..8be0bfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -61,8 +61,8 @@ import org.slf4j.LoggerFactory;
public class MergeMultiChunkTask {
private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
- private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
- .getMergeChunkPointNumberThreshold();
+ private static int minChunkPointNum =
+ IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
private MergeLogger mergeLogger;
private List<PartialPath> unmergedSeries;
@@ -84,9 +84,15 @@ public class MergeMultiChunkTask {
private String storageGroupName;
- public MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger,
- MergeResource mergeResource, boolean fullMerge, List<PartialPath> unmergedSeries,
- int concurrentMergeSeriesNum, String storageGroupName) {
+ public MergeMultiChunkTask(
+ MergeContext context,
+ String taskName,
+ MergeLogger mergeLogger,
+ MergeResource mergeResource,
+ boolean fullMerge,
+ List<PartialPath> unmergedSeries,
+ int concurrentMergeSeriesNum,
+ String storageGroupName) {
this.mergeContext = context;
this.taskName = taskName;
this.mergeLogger = mergeLogger;
@@ -124,8 +130,8 @@ public class MergeMultiChunkTask {
}
}
if (logger.isInfoEnabled()) {
- logger.info("{} all series are merged after {}ms", taskName,
- System.currentTimeMillis() - startTime);
+ logger.info(
+ "{} all series are merged after {}ms", taskName, System.currentTimeMillis() - startTime);
}
mergeLogger.logAllTsEnd();
}
@@ -166,8 +172,7 @@ public class MergeMultiChunkTask {
mergeLogger.logTSEnd();
}
- private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders)
- throws IOException {
+ private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException {
TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
String deviceId = currMergingPaths.get(0).getDevice();
long currDeviceMinTime = currTsFile.getStartTime(deviceId);
@@ -196,6 +201,10 @@ public class MergeMultiChunkTask {
modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i));
seqChunkMeta[i] = resource.queryChunkMetadata(currMergingPaths.get(i), currTsFile);
modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
+ for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) {
+ resource.updateStartTime(currTsFile, deviceId, chunkMetadata.getStartTime());
+ resource.updateEndTime(currTsFile, deviceId, chunkMetadata.getEndTime());
+ }
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -215,8 +224,14 @@ public class MergeMultiChunkTask {
}
// merge unseq data with seq data in this file or small chunks in this file into a larger chunk
mergeFileWriter.startChunkGroup(deviceId);
- boolean dataWritten = mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders,
- mergeFileWriter, currTsFile);
+ boolean dataWritten =
+ mergeChunks(
+ seqChunkMeta,
+ isLastFile,
+ fileSequenceReader,
+ unseqReaders,
+ mergeFileWriter,
+ currTsFile);
if (dataWritten) {
mergeFileWriter.writeVersion(0L);
mergeFileWriter.endChunkGroup();
@@ -230,8 +245,10 @@ public class MergeMultiChunkTask {
// series should also be written into a new chunk
List<Integer> ret = new ArrayList<>();
for (int i = 0; i < currMergingPaths.size(); i++) {
- if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()
- && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePairs[i] != null)) {
+ if (seqChunkMeta[i] == null
+ || seqChunkMeta[i].isEmpty()
+ && !(seqFileIdx + 1 == resource.getSeqFiles().size()
+ && currTimeValuePairs[i] != null)) {
continue;
}
ret.add(i);
@@ -240,13 +257,17 @@ public class MergeMultiChunkTask {
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private boolean mergeChunks(List<ChunkMetadata>[] seqChunkMeta, boolean isLastFile,
- TsFileSequenceReader reader, IPointReader[] unseqReaders,
- RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile)
+ private boolean mergeChunks(
+ List<ChunkMetadata>[] seqChunkMeta,
+ boolean isLastFile,
+ TsFileSequenceReader reader,
+ IPointReader[] unseqReaders,
+ RestorableTsFileIOWriter mergeFileWriter,
+ TsFileResource currFile)
throws IOException {
int[] ptWrittens = new int[seqChunkMeta.length];
- int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
- .getMergeChunkSubThreadNum();
+ int mergeChunkSubTaskNum =
+ IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()];
PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum];
for (int i = 0; i < mergeChunkSubTaskNum; i++) {
@@ -271,13 +292,19 @@ public class MergeMultiChunkTask {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < mergeChunkSubTaskNum; i++) {
- futures.add(MergeManager.getINSTANCE()
- .submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i],
- metaListEntries, ptWrittens,
- reader,
- mergeFileWriter, unseqReaders,
- currFile,
- isLastFile, i)));
+ futures.add(
+ MergeManager.getINSTANCE()
+ .submitChunkSubTask(
+ new MergeChunkHeapTask(
+ chunkIdxHeaps[i],
+ metaListEntries,
+ ptWrittens,
+ reader,
+ mergeFileWriter,
+ unseqReaders,
+ currFile,
+ isLastFile,
+ i)));
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -297,49 +324,74 @@ public class MergeMultiChunkTask {
}
// add merge and unmerged chunk statistic
- mergeContext.getMergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ?
- mergedChunkNum.get() : anInt + mergedChunkNum.get());
- mergeContext.getUnmergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ?
- unmergedChunkNum.get() : anInt + unmergedChunkNum.get());
+ mergeContext
+ .getMergedChunkCnt()
+ .compute(
+ currFile,
+ (tsFileResource, anInt) ->
+ anInt == null ? mergedChunkNum.get() : anInt + mergedChunkNum.get());
+ mergeContext
+ .getUnmergedChunkCnt()
+ .compute(
+ currFile,
+ (tsFileResource, anInt) ->
+ anInt == null ? unmergedChunkNum.get() : anInt + unmergedChunkNum.get());
return mergedChunkNum.get() > 0;
}
-
/**
* merge a sequence chunk SK
- * <p>
- * 1. no need to write the chunk to .merge file when: isn't full merge & there isn't unclosed
+ *
+ * <p>1. no need to write the chunk to .merge file when: isn't full merge & there isn't unclosed
* chunk before & SK is big enough & SK isn't overflowed & SK isn't modified
+ *
* <p>
- * <p>
- * 2. write SK to .merge.file without compressing when: is full merge & there isn't unclosed chunk
- * before & SK is big enough & SK isn't overflowed & SK isn't modified
- * <p>
- * 3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is
+ *
+ * <p>2. write SK to .merge.file without compressing when: is full merge & there isn't unclosed
+ * chunk before & SK is big enough & SK isn't overflowed & SK isn't modified
+ *
+ * <p>3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is
* overflowed
*/
@SuppressWarnings("java:S2445") // avoid writing the same writer concurrently
- private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed,
- boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx,
- TsFileIOWriter mergeFileWriter, IPointReader unseqReader,
- IChunkWriter chunkWriter, TsFileResource currFile) throws IOException {
+ private int mergeChunkV2(
+ ChunkMetadata currMeta,
+ boolean chunkOverflowed,
+ boolean chunkTooSmall,
+ Chunk chunk,
+ int lastUnclosedChunkPoint,
+ int pathIdx,
+ TsFileIOWriter mergeFileWriter,
+ IPointReader unseqReader,
+ IChunkWriter chunkWriter,
+ TsFileResource currFile)
+ throws IOException {
int unclosedChunkPoint = lastUnclosedChunkPoint;
- boolean chunkModified = (currMeta.getDeleteIntervalList() != null &&
- !currMeta.getDeleteIntervalList().isEmpty());
+ boolean chunkModified =
+ (currMeta.getDeleteIntervalList() != null && !currMeta.getDeleteIntervalList().isEmpty());
// no need to write the chunk to .merge file
- if (!fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed
+ if (!fullMerge
+ && lastUnclosedChunkPoint == 0
+ && !chunkTooSmall
+ && !chunkOverflowed
&& !chunkModified) {
unmergedChunkNum.incrementAndGet();
- mergeContext.getUnmergedChunkStartTimes().get(currFile).get(currMergingPaths.get(pathIdx))
+ mergeContext
+ .getUnmergedChunkStartTimes()
+ .get(currFile)
+ .get(currMergingPaths.get(pathIdx))
.add(currMeta.getStartTime());
return 0;
}
// write SK to .merge.file without compressing
- if (fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed
+ if (fullMerge
+ && lastUnclosedChunkPoint == 0
+ && !chunkTooSmall
+ && !chunkOverflowed
&& !chunkModified) {
synchronized (mergeFileWriter) {
mergeFileWriter.writeChunk(chunk, currMeta);
@@ -356,8 +408,8 @@ public class MergeMultiChunkTask {
mergedChunkNum.incrementAndGet();
} else {
// 3.2 SK is overflowed, uncompress sequence chunk and merge with unseq chunk, then write
- unclosedChunkPoint += writeChunkWithUnseq(chunk, chunkWriter, unseqReader,
- currMeta.getEndTime(), pathIdx);
+ unclosedChunkPoint +=
+ writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx);
mergedChunkNum.incrementAndGet();
}
@@ -374,8 +426,9 @@ public class MergeMultiChunkTask {
return unclosedChunkPoint;
}
- private int writeRemainingUnseq(IChunkWriter chunkWriter,
- IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException {
+ private int writeRemainingUnseq(
+ IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx)
+ throws IOException {
int ptWritten = 0;
while (currTimeValuePairs[pathIdx] != null
&& currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
@@ -388,8 +441,13 @@ public class MergeMultiChunkTask {
return ptWritten;
}
- private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
- long chunkLimitTime, int pathIdx) throws IOException {
+ private int writeChunkWithUnseq(
+ Chunk chunk,
+ IChunkWriter chunkWriter,
+ IPointReader unseqReader,
+ long chunkLimitTime,
+ int pathIdx)
+ throws IOException {
int cnt = 0;
ChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextSatisfiedPage()) {
@@ -400,8 +458,9 @@ public class MergeMultiChunkTask {
return cnt;
}
- private int mergeWriteBatch(BatchData batchData, IChunkWriter chunkWriter,
- IPointReader unseqReader, int pathIdx) throws IOException {
+ private int mergeWriteBatch(
+ BatchData batchData, IChunkWriter chunkWriter, IPointReader unseqReader, int pathIdx)
+ throws IOException {
int cnt = 0;
for (int i = 0; i < batchData.length(); i++) {
long time = batchData.getTimeByIndex(i);
@@ -443,11 +502,16 @@ public class MergeMultiChunkTask {
private int totalSeriesNum;
- public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap,
- MetaListEntry[] metaListEntries, int[] ptWrittens,
+ public MergeChunkHeapTask(
+ PriorityQueue<Integer> chunkIdxHeap,
+ MetaListEntry[] metaListEntries,
+ int[] ptWrittens,
TsFileSequenceReader reader,
RestorableTsFileIOWriter mergeFileWriter,
- IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile, int taskNum) {
+ IPointReader[] unseqReaders,
+ TsFileResource currFile,
+ boolean isLastFile,
+ int taskNum) {
this.chunkIdxHeap = chunkIdxHeap;
this.metaListEntries = metaListEntries;
this.ptWrittens = ptWrittens;
@@ -482,18 +546,28 @@ public class MergeMultiChunkTask {
MetaListEntry metaListEntry = metaListEntries[pathIdx];
ChunkMetadata currMeta = metaListEntry.current();
boolean isLastChunk = !metaListEntry.hasNext();
- boolean chunkOverflowed = MergeUtils
- .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
- boolean chunkTooSmall = MergeUtils
- .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
+ boolean chunkOverflowed =
+ MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+ boolean chunkTooSmall =
+ MergeUtils.isChunkTooSmall(
+ ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
Chunk chunk;
synchronized (reader) {
chunk = reader.readMemChunk(currMeta);
}
- ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
- ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
- currFile);
+ ptWrittens[pathIdx] =
+ mergeChunkV2(
+ currMeta,
+ chunkOverflowed,
+ chunkTooSmall,
+ chunk,
+ ptWrittens[pathIdx],
+ pathIdx,
+ mergeFileWriter,
+ unseqReaders[pathIdx],
+ chunkWriter,
+ currFile);
if (!isLastChunk) {
metaListEntry.next();
@@ -504,9 +578,8 @@ public class MergeMultiChunkTask {
// this only happens when the seqFiles do not contain this series, otherwise the remaining
// data will be merged with the last chunk in the seqFiles
if (isLastFile && currTimeValuePairs[pathIdx] != null) {
- ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
- Long.MAX_VALUE,
- pathIdx);
+ ptWrittens[pathIdx] +=
+ writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx);
mergedChunkNum.incrementAndGet();
}
// the last merged chunk may still be smaller than the threshold, flush it anyway
@@ -527,8 +600,8 @@ public class MergeMultiChunkTask {
}
public String getProgress() {
- return String.format("Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(),
- totalSeriesNum);
+ return String.format(
+ "Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), totalSeriesNum);
}
}
}