You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 11:26:49 UTC
[incubator-iotdb] branch dev_merge updated: add documentation and
code refinements
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new a301d7b add documentation and code refinements
a301d7b is described below
commit a301d7b89039de39eb9dd9bce3cd1d6a31991ec5
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 19:24:25 2019 +0800
add documentation and code refinements
---
.../iotdb/db/engine/merge/manage/MergeManager.java | 4 +
.../db/engine/merge/manage/MergeResource.java | 17 +++-
.../iotdb/db/engine/merge/recover/LogAnalyzer.java | 21 ++++-
.../iotdb/db/engine/merge/recover/MergeLogger.java | 17 ++--
.../engine/merge/selector/MergeFileSelector.java | 70 ++++------------
.../iotdb/db/engine/merge/task/MergeCallback.java | 1 +
.../{MergeSeriesTask.java => MergeChunkTask.java} | 98 +++++++++++++---------
.../iotdb/db/engine/merge/task/MergeFileTask.java | 42 +++++-----
.../iotdb/db/engine/merge/task/MergeTask.java | 33 ++++----
.../db/engine/merge/task/RecoverMergeTask.java | 42 +++++-----
.../java/org/apache/iotdb/db/utils/MergeUtils.java | 31 +++++++
.../iotdb/tsfile/read/controller/ChunkLoader.java | 4 +
.../tsfile/read/controller/ChunkLoaderImpl.java | 4 +
13 files changed, 220 insertions(+), 164 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 98132c1..a04e0f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -33,6 +33,10 @@ import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total
+ * resources occupied by merge and manages a Timer to periodically issue a global merge.
+ */
public class MergeManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d8cc445..7792828 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -45,12 +45,18 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+/**
+ * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
+ * modifications to avoid unnecessary object creations and file openings.
+ */
public class MergeResource {
private List<TsFileResource> seqFiles;
private List<TsFileResource> unseqFiles;
// future feature
+ // keeping ChunkMetadata in memory avoids reading them again when we need to move unmerged
+ // chunks to the merged file, but this may consume memory considerably
private boolean keepChunkMetadata = false;
private QueryContext mergeContext = new QueryContext();
@@ -60,6 +66,7 @@ public class MergeResource {
private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
private Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
+ private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new HashMap<>();
public MergeResource(
List<TsFileResource> seqFiles,
@@ -77,10 +84,12 @@ public class MergeResource {
fileWriterCache.clear();
modificationCache.clear();
metadataQuerierCache.clear();
+ measurementSchemaMap.clear();
+ chunkWriterCache.clear();
}
- public MeasurementSchema getSchema(Path path) {
- return measurementSchemaMap.get(path.getMeasurement());
+ public MeasurementSchema getSchema(String measurementId) {
+ return measurementSchemaMap.get(measurementId);
}
public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
@@ -117,8 +126,8 @@ public class MergeResource {
}
public IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) {
- return new ChunkWriterImpl(measurementSchema,
- new ChunkBuffer(measurementSchema), TSFileConfig.pageCheckSizeThreshold);
+ return chunkWriterCache.computeIfAbsent(measurementSchema,
+ k -> new ChunkWriterImpl(k, new ChunkBuffer(k), TSFileConfig.pageCheckSizeThreshold));
}
public TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index 5c7a004..19ed3d1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -37,6 +37,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge,
+ * the last available positions of each file and how many timeseries and files have been merged.
+ */
public class LogAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class);
@@ -59,6 +63,12 @@ public class LogAnalyzer {
this.logFile = logFile;
}
+ /**
+ * Scan through the logs to find out where the last merge has stopped and store the information
+ * about the progress in the fields.
+ * @return a Status indicating the completed stage of the last merge.
+ * @throws IOException
+ */
public Status analyze() throws IOException {
Status status = Status.NONE;
try (BufferedReader bufferedReader =
@@ -135,7 +145,7 @@ public class LogAnalyzer {
}
}
if (logger.isDebugEnabled()) {
- logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
+ logger.debug("{} found {} unseq files after {}ms", taskName, mergeUnseqFiles.size(),
(System.currentTimeMillis() - startTime));
}
resource.setUnseqFiles(mergeUnseqFiles);
@@ -208,7 +218,14 @@ public class LogAnalyzer {
}
public enum Status {
- NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
+ // almost nothing has been done
+ NONE,
+ // at least the files to be merged are known
+ FILES_LOGGED,
+ // all the timeseries have been merged (merged chunks are generated)
+ ALL_TS_MERGED,
+ // all the merge files are merged with the origin files and the job is almost done
+ MERGE_END
}
public List<Path> getUnmergedPaths() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
index 3e38936..24bf620 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
@@ -28,17 +28,20 @@ import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.read.common.Path;
+/**
+ * MergeLogger records the progress of a merge in file "merge.log" as text lines.
+ */
public class MergeLogger {
public static final String MERGE_LOG_NAME = "merge.log";
- public static final String STR_SEQ_FILES = "seqFiles";
- public static final String STR_UNSEQ_FILES = "unseqFiles";
- public static final String STR_START = "start";
- public static final String STR_END = "end";
- public static final String STR_ALL_TS_END = "all ts end";
- public static final String STR_MERGE_START = "merge start";
- public static final String STR_MERGE_END = "merge end";
+ static final String STR_SEQ_FILES = "seqFiles";
+ static final String STR_UNSEQ_FILES = "unseqFiles";
+ static final String STR_START = "start";
+ static final String STR_END = "end";
+ static final String STR_ALL_TS_END = "all ts end";
+ static final String STR_MERGE_START = "merge start";
+ static final String STR_MERGE_END = "merge end";
private BufferedWriter logStream;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
index 136a4f2..b826960 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
@@ -27,10 +27,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
@@ -56,7 +56,6 @@ public class MergeFileSelector {
private long totalCost;
private long memoryBudget;
private long maxSeqFileCost;
- private long tempMaxSeqFileCost;
private Map<TsFileResource, Long> fileMetaCostMap = new HashMap<>();
private Map<TsFileResource, Long> maxSeriesCostMap = new HashMap<>();
@@ -67,9 +66,9 @@ public class MergeFileSelector {
private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
private List<Integer> tmpSelectedSeqFiles;
+ private long tempMaxSeqFileCost;
private boolean[] seqSelected;
- private boolean[] unseqSelected;
public MergeFileSelector(
List<TsFileResource> seqFiles,
@@ -86,22 +85,22 @@ public class MergeFileSelector {
* pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as
* candidates, otherwise go to the next iteration.
* The memory cost of a file is calculated in two ways:
- * The rough estimation: for a seqFile, the size of its metadata is the estimation. Since in
- * the worst case, the file only contains one timeseries and all its metadata will be loaded
- * into memory with at most one actual data chunk (which is negligible) and writing the
- * timeseries into a new file generate metadata of the similar size, so the size of all
+ * The rough estimation: for a seqFile, the size of its metadata is used for estimation.
+ * Since in the worst case, the file only contains one timeseries and all its metadata will
+ * be loaded into memory with at most one actual data page (which is negligible) and writing
+ * the timeseries into a new file generate metadata of the similar size, so the size of all
* seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded
* when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for
- * an unseqFile, since the merge reader may read all its chunks to perform a merge read, the
- * whole file may be loaded into memory and for the same reason of writing series, so we use
- * the file's length as the maximum estimation.
- * The tight estimation: based on the rough estimation, we scan the file's meta data to
- * count the number of chunks for each series, and the series which have the most chunks in
- * the file and its chunk proportion to shrink the rough estimation.
+ * an unseqFile, since the merge reader may read all chunks of a series to perform a merge
+ * read, the whole file may be loaded into memory, so we use the file's length as the
+ * maximum estimation.
+ * The tight estimation: based on the rough estimation, we scan the file's metadata to
+ * count the number of chunks for each series, find the series which have the most
+ * chunks in the file and use its chunk proportion to refine the rough estimation.
* The rough estimation is performed first, if no candidates can be found using rough
* estimation, we run the selection again with tight estimation.
* @return two lists of TsFileResource, the former is selected seqFiles and the latter is
- * selected unseqFiles or an empty array if there is no proper candidates by the budget.
+ * selected unseqFiles or an empty array if there are no proper candidates by the budget.
* @throws MergeException
*/
public List[] doSelect() throws MergeException {
@@ -113,7 +112,7 @@ public class MergeFileSelector {
if (selectedUnseqFiles.isEmpty()) {
doSelect(true);
}
- clean();
+ clear();
if (selectedUnseqFiles.isEmpty()) {
logger.info("No merge candidates are found");
return new List[0];
@@ -130,7 +129,7 @@ public class MergeFileSelector {
return new List[]{selectedSeqFiles, selectedUnseqFiles};
}
- private void clean() throws IOException {
+ private void clear() throws IOException {
for (TsFileSequenceReader reader : fileReaderCache.values()) {
reader.close();
}
@@ -141,17 +140,12 @@ public class MergeFileSelector {
tmpSelectedSeqFiles = new ArrayList<>();
seqSelected = new boolean[seqFiles.size()];
- unseqSelected = new boolean[unseqFiles.size()];
totalCost = 0;
int unseqIndex = 0;
while (unseqIndex < unseqFiles.size()) {
// select next unseq files
- if (unseqSelected[unseqIndex]) {
- unseqIndex++;
- continue;
- }
TsFileResource unseqFile = unseqFiles.get(unseqIndex);
selectOverlappedSeqFiles(unseqFile);
@@ -162,7 +156,6 @@ public class MergeFileSelector {
if (totalCost + newCost < memoryBudget) {
selectedUnseqFiles.add(unseqFile);
- unseqSelected[unseqIndex] = true;
maxSeqFileCost = tempMaxSeqFileCost;
for (Integer seqIdx : tmpSelectedSeqFiles) {
@@ -185,7 +178,7 @@ public class MergeFileSelector {
continue;
}
TsFileResource seqFile = seqFiles.get(i);
- if (fileOverlap(seqFile, unseqFile)) {
+ if (MergeUtils.fileOverlap(seqFile, unseqFile)) {
tmpSelectedSeqFiles.add(i);
}
}
@@ -206,7 +199,7 @@ public class MergeFileSelector {
cost += fileCost;
tempMaxSeqFileCost = fileCost;
}
- // but writing data into a new file may generate the same amount of metadata
+ // but writing data into a new file may generate the same amount of metadata in memory
cost += fileCost;
}
return cost;
@@ -321,35 +314,6 @@ public class MergeFileSelector {
}
- private boolean fileOverlap(TsFileResource seqFile, TsFileResource unseqFile) {
- Map<String, Long> seqStartTimes = seqFile.getStartTimeMap();
- Map<String, Long> seqEndTimes = seqFile.getEndTimeMap();
- Map<String, Long> unseqStartTimes = unseqFile.getStartTimeMap();
- Map<String, Long> unseqEndTimes = unseqFile.getEndTimeMap();
-
- for (Entry<String, Long> seqEntry : seqStartTimes.entrySet()) {
- Long unseqStartTime = unseqStartTimes.get(seqEntry.getKey());
- if (unseqStartTime == null) {
- continue;
- }
- Long unseqEndTime = unseqEndTimes.get(seqEntry.getKey());
- Long seqStartTime = seqEntry.getValue();
- Long seqEndTime = seqEndTimes.get(seqEntry.getKey());
-
- if (intervalOverlap(seqStartTime, seqEndTime, unseqStartTime, unseqEndTime)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean intervalOverlap(long l1, long r1, long l2, long r2) {
- return (l1 <= l2 && l2 <= r1) ||
- (l1 <= r2 && r2 <= r1) ||
- (l2 <= l1 && l1 <= r2) ||
- (l2 <= r1 && r1 <= r2);
- }
-
private TsFileSequenceReader getReader(TsFileResource tsFileResource) throws IOException {
TsFileSequenceReader reader = fileReaderCache.get(tsFileResource);
if (reader == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
index 7377a2c..c66edee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
@@ -32,6 +32,7 @@ public interface MergeCallback {
* modifications have been merged into the new files.
* 2. remove the unseqFiles since they have been merged into new files.
* 3. remove the merge log file
+ * 4. exit merging status
* @param seqFiles
* @param unseqFiles
*/
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
similarity index 79%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
index e6ec446..165bc3d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
@@ -29,12 +29,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,8 +42,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -52,9 +50,16 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class MergeSeriesTask {
+/**
+ * MergeChunkTask rewrites chunks that satisfy that following conditions into the merge temp file:
+ * 1. the number of points in this chunk (size of chunk) < minChunkPointNum
+ * 2. the previous chunks are rewritten but their total size is still less than minChunkPointNum
+ * 3. the chunk is overflowed (contains the duplicated data with some unseq chunks)
+ * 4, the chunk is updated/deleted
+ */
+class MergeChunkTask {
- private static final Logger logger = LoggerFactory.getLogger(MergeSeriesTask.class);
+ private static final Logger logger = LoggerFactory.getLogger(MergeChunkTask.class);
private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
private Map<TsFileResource, Integer> mergedChunkCnt;
@@ -71,7 +76,7 @@ class MergeSeriesTask {
int totalChunkWritten;
- MergeSeriesTask(
+ MergeChunkTask(
Map<TsFileResource, Integer> mergedChunkCnt,
Map<TsFileResource, Integer> unmergedChunkCnt,
Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes,
@@ -95,7 +100,7 @@ class MergeSeriesTask {
for (TsFileResource seqFile : resource.getSeqFiles()) {
unmergedChunkStartTimes.put(seqFile, new HashMap<>());
}
- // merge each series and write data into each seqFile's temp merge file
+ // merge each series and write data into each seqFile's corresponding temp merge file
int mergedCnt = 0;
double progress = 0.0;
for (Path path : unmergedSeries) {
@@ -119,7 +124,7 @@ class MergeSeriesTask {
private void mergeOnePath(Path path) throws IOException {
IPointReader unseqReader = resource.getUnseqReader(path);
- MeasurementSchema schema = resource.getSchema(path);
+ MeasurementSchema schema = resource.getSchema(path.getMeasurement());
try {
if (unseqReader.hasNext()) {
currTimeValuePair = unseqReader.next();
@@ -157,28 +162,32 @@ class MergeSeriesTask {
}
TsFileSequenceReader fileSequenceReader = resource.getFileReader(currTsFile);
- ChunkLoader chunkLoader = new ChunkLoaderImpl(fileSequenceReader);
List<Modification> modifications = resource.getModifications(currTsFile, path);
List<ChunkMetaData> seqChunkMeta = resource.queryChunkMetadata(path, currTsFile);
- if (seqChunkMeta.isEmpty()) {
+ // if the last seqFile does not contains this series but the unseqFiles do, data of this
+ // series should also be written into a new chunk
+ if (seqChunkMeta.isEmpty()
+ && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePair != null)) {
return;
}
modifyChunkMetaData(seqChunkMeta, modifications);
RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile);
currDeviceMaxTime = currTsFile.getEndTimeMap().get(path.getDevice());
- // merge unseq data with this file or small chunks in this file into a larger chunk
+ // merge unseq data with seq data in this file or small chunks in this file into a larger chunk
mergeFileWriter.startChunkGroup(deviceId);
- if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
+ if (mergeChunks(seqChunkMeta, fileLimitTime, fileSequenceReader, measurementSchema,
unseqReader, mergeFileWriter, currTsFile, path)) {
- mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1);
+ long version = !seqChunkMeta.isEmpty() ?
+ seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1 : 0;
+ mergeFileWriter.endChunkGroup(version);
mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
}
currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
}
private boolean mergeChunks(List<ChunkMetaData> seqChunkMeta, long fileLimitTime,
- ChunkLoader chunkLoader, MeasurementSchema measurementSchema,
+ TsFileSequenceReader reader, MeasurementSchema measurementSchema,
IPointReader unseqReader, RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile
, Path path)
throws IOException {
@@ -189,12 +198,10 @@ class MergeSeriesTask {
for (int i = 0; i < seqChunkMeta.size(); i++) {
ChunkMetaData currMeta = seqChunkMeta.get(i);
boolean isLastChunk = i + 1 == seqChunkMeta.size();
- long chunkLimitTime = i + 1 < seqChunkMeta.size() ? seqChunkMeta.get(i + 1).getStartTime()
- : fileLimitTime;
+ long chunkLimitTime = isLastChunk ? fileLimitTime : seqChunkMeta.get(i + 1).getStartTime();
- int newPtWritten = writeChunk(chunkLimitTime, ptWritten, chunkLoader, currMeta, chunkWriter,
- isLastChunk,
- measurementSchema, unseqReader);
+ int newPtWritten = writeChunk(chunkLimitTime, ptWritten, reader, currMeta, chunkWriter,
+ isLastChunk, measurementSchema, unseqReader);
if (newPtWritten > ptWritten) {
mergedChunkNum ++;
@@ -210,8 +217,16 @@ class MergeSeriesTask {
ptWritten = 0;
}
}
+ // this only happens when the last seqFile does not contain this series, otherwise the remaining
+ // parts will be merged with the last chunk in the seqFile
+ if (fileLimitTime == Long.MAX_VALUE && currTimeValuePair != null) {
+ ptWritten += writeRemainingUnseq(chunkWriter, measurementSchema.getType(),
+ unseqReader, fileLimitTime);
+ mergedChunkNum ++;
+ }
+
+ // the last merged chunk may still be smaller than the threshold, flush it anyway
if (ptWritten > 0) {
- // the last merged chunk may still be smaller than the threshold, flush it anyway
chunkWriter.writeToFileWriter(mergeFileWriter);
}
updateChunkCounts(currFile, mergedChunkNum, unmergedChunkNum);
@@ -219,6 +234,20 @@ class MergeSeriesTask {
return mergedChunkNum > 0;
}
+ private int writeRemainingUnseq(IChunkWriter chunkWriter,
+ TSDataType dataType, IPointReader unseqReader, long timeLimit) throws IOException {
+ int ptWritten = 0;
+ while (currTimeValuePair != null && currTimeValuePair.getTimestamp() < timeLimit) {
+ writeTVPair(currTimeValuePair, chunkWriter, dataType);
+ if (currTimeValuePair.getTimestamp() > currDeviceMaxTime) {
+ currDeviceMaxTime = currTimeValuePair.getTimestamp();
+ }
+ ptWritten ++;
+ currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
+ }
+ return ptWritten;
+ }
+
private void updateChunkCounts(TsFileResource currFile, int newMergedChunkNum,
int newUnmergedChunkNum) {
mergedChunkCnt.compute(currFile, (tsFileResource, anInt) -> anInt == null ? newMergedChunkNum
@@ -227,28 +256,28 @@ class MergeSeriesTask {
: anInt + newUnmergedChunkNum);
}
- private int writeChunk(long chunkLimitTime, int ptWritten, ChunkLoader chunkLoader,
+ private int writeChunk(long chunkLimitTime, int ptWritten, TsFileSequenceReader reader,
ChunkMetaData currMeta, IChunkWriter chunkWriter, boolean isLastChunk,
MeasurementSchema measurementSchema, IPointReader unseqReader) throws IOException {
- // unseq data is not over and this chunk's time range cover the overflow point
- boolean chunkOverlap =
+ // unseq data is not over and this chunk's time range covers the current overflow point
+ boolean chunkOverflowed =
currTimeValuePair != null && currTimeValuePair.getTimestamp() < chunkLimitTime;
- // a small chunk has been written, this chunk merge with it to create a larger chunk
- // or this chunk is too small and it is not the last chunk, merge it with the next chunk
+ // a small chunk has been written, this chunk should be merged with it to create a larger chunk
+ // or this chunk is too small and it is not the last chunk, merge it with the next chunks
boolean chunkTooSmall =
ptWritten > 0 || (minChunkPointNum >= 0 && currMeta.getNumOfPoints() < minChunkPointNum && !isLastChunk);
boolean chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
int newPtWritten = ptWritten;
- if (!chunkOverlap && (chunkTooSmall || chunkModified || fullMerge)) {
+ if (!chunkOverflowed && (chunkTooSmall || chunkModified || fullMerge)) {
// just rewrite the (modified) chunk
- Chunk chunk = chunkLoader.getChunk(currMeta);
+ Chunk chunk = reader.readMemChunk(currMeta);
newPtWritten += MergeUtils.writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
totalChunkWritten ++;
- } else if (chunkOverlap) {
+ } else if (chunkOverflowed) {
// this chunk may merge with the current point
- Chunk chunk = chunkLoader.getChunk(currMeta);
+ Chunk chunk = reader.readMemChunk(currMeta);
newPtWritten += writeChunkWithUnseq(chunk, chunkWriter, measurementSchema.getType(),
unseqReader, chunkLimitTime);
totalChunkWritten ++;
@@ -264,14 +293,7 @@ class MergeSeriesTask {
BatchData batchData = chunkReader.nextBatch();
cnt += mergeWriteBatch(batchData, chunkWriter, dataType, unseqReader);
}
- while (currTimeValuePair != null && currTimeValuePair.getTimestamp() < chunkLimitTime) {
- writeTVPair(currTimeValuePair, chunkWriter, dataType);
- if (currTimeValuePair.getTimestamp() > currDeviceMaxTime) {
- currDeviceMaxTime = currTimeValuePair.getTimestamp();
- }
- currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
- cnt ++;
- }
+ cnt += writeRemainingUnseq(chunkWriter, dataType, unseqReader, chunkLimitTime);
return cnt;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index af129cb..6cb36f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -35,8 +35,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -46,6 +44,11 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into the merge temp
+ * files, depending on which one is the majority.
+ */
class MergeFileTask {
private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class);
@@ -73,7 +76,7 @@ class MergeFileTask {
}
void mergeFiles() throws IOException {
- // decide whether to write the unmerged chunks to the merge files or to move the merged data
+ // decide whether to write the unmerged chunks to the merge files or to move the merged chunks
// back to the origin seqFile's
if (logger.isInfoEnabled()) {
logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
@@ -128,20 +131,19 @@ class MergeFileTask {
// this file may already be truncated if this merge is a system reboot merge
oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
}
-
+ // filter the chunks that have been merged
oldFileWriter.filterChunks(unmergedChunkStartTimes.get(seqFile));
RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
newFileWriter.close();
- try ( TsFileSequenceReader newFileReader =
+ try (TsFileSequenceReader newFileReader =
new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
- ChunkLoader chunkLoader = new ChunkLoaderImpl(newFileReader);
List<ChunkGroupMetaData> chunkGroupMetadataList = newFileWriter.getChunkGroupMetaDatas();
if (logger.isDebugEnabled()) {
logger.debug("{} find {} merged chunk groups", taskName, chunkGroupMetadataList.size());
}
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetadataList) {
- writeMergedChunkGroup(chunkGroupMetaData, chunkLoader, oldFileWriter);
+ writeMergedChunkGroup(chunkGroupMetaData, newFileReader, oldFileWriter);
}
}
oldFileWriter.endFile(new FileSchema(oldFileWriter.getKnownSchema()));
@@ -157,27 +159,23 @@ class MergeFileTask {
}
private void writeMergedChunkGroup(ChunkGroupMetaData chunkGroupMetaData,
- ChunkLoader chunkLoader, TsFileIOWriter fileWriter)
+ TsFileSequenceReader reader, TsFileIOWriter fileWriter)
throws IOException {
fileWriter.startChunkGroup(chunkGroupMetaData.getDeviceID());
- long maxVersion = Long.MIN_VALUE;
+ long version = chunkGroupMetaData.getVersion();
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
- Path path = new Path(chunkGroupMetaData.getDeviceID(), chunkMetaData.getMeasurementUid());
- MeasurementSchema measurementSchema = resource.getSchema(path);
+ MeasurementSchema measurementSchema = resource.getSchema(chunkMetaData.getMeasurementUid());
IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ Chunk chunk = reader.readMemChunk(chunkMetaData);
writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
chunkWriter.writeToFileWriter(fileWriter);
- maxVersion = maxVersion < chunkMetaData.getVersion() ? chunkMetaData.getVersion() :
- maxVersion;
}
- fileWriter.endChunkGroup(maxVersion + 1);
+ fileWriter.endChunkGroup(version + 1);
}
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
Map<Path, List<Long>> fileUnmergedChunkStartTimes = this.unmergedChunkStartTimes.get(seqFile);
RestorableTsFileIOWriter fileWriter = resource.getMergeFileWriter(seqFile);
- ChunkLoader chunkLoader = new ChunkLoaderImpl(resource.getFileReader(seqFile));
mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
logger.debug("{} moving unmerged chunks of {} to the new file", taskName, seqFile);
@@ -193,7 +191,7 @@ class MergeFileTask {
}
List<ChunkMetaData> chunkMetaDataList = resource.queryChunkMetadata(path, seqFile);
- MeasurementSchema measurementSchema = resource.getSchema(path);
+ MeasurementSchema measurementSchema = resource.getSchema(path.getMeasurement());
IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
if (logger.isDebugEnabled()) {
@@ -201,8 +199,8 @@ class MergeFileTask {
}
fileWriter.startChunkGroup(path.getDevice());
- long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
- chunkWriter, measurementSchema, fileWriter);
+ long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList,
+ resource.getFileReader(seqFile), chunkWriter, measurementSchema, fileWriter);
fileWriter.endChunkGroup(maxVersion + 1);
}
}
@@ -223,7 +221,7 @@ class MergeFileTask {
}
private long writeUnmergedChunks(List<Long> chunkStartTimes,
- List<ChunkMetaData> chunkMetaDataList, ChunkLoader chunkLoader, IChunkWriter chunkWriter,
+ List<ChunkMetaData> chunkMetaDataList, TsFileSequenceReader reader, IChunkWriter chunkWriter,
MeasurementSchema measurementSchema, RestorableTsFileIOWriter fileWriter) throws IOException {
long maxVersion = 0;
int chunkIdx = 0;
@@ -231,7 +229,7 @@ class MergeFileTask {
for (; chunkIdx < chunkMetaDataList.size(); chunkIdx ++) {
ChunkMetaData metaData = chunkMetaDataList.get(chunkIdx);
if (metaData.getStartTime() == startTime) {
- Chunk chunk = chunkLoader.getChunk(metaData);
+ Chunk chunk = reader.readMemChunk(metaData);
writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
chunkWriter.writeToFileWriter(fileWriter);
maxVersion = metaData.getVersion() > maxVersion ? metaData.getVersion() : maxVersion;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 64dd4da..c743280 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -35,7 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * MergeTask merges given SeqFiles and UnseqFiles into a new one.
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files
+ * 2. move the merged chunks in the temp files back to the seqFiles or move the unmerged
+ * chunks in the seqFiles int temp files and replace the seqFiles with the temp files.
+ * 3. remove unseqFiles
*/
public class MergeTask implements Callable<Void> {
@@ -51,9 +55,7 @@ public class MergeTask implements Callable<Void> {
Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
private MergeCallback callback;
-
String taskName;
-
boolean fullMerge;
public MergeTask(List<TsFileResource> seqFiles,
@@ -72,9 +74,10 @@ public class MergeTask implements Callable<Void> {
doMerge();
} catch (Exception e) {
logger.error("Runtime exception in merge {}", taskName, e);
- resource.setSeqFiles(Collections.emptyList());
- resource.setUnseqFiles(Collections.emptyList());
- cleanUp(true);
+ cleanUp(false);
+ // call the callback to make sure the StorageGroup exit merging status, but passing 2
+ // empty file lists to avoid files being deleted.
+ callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME));
throw e;
}
return null;
@@ -89,12 +92,13 @@ public class MergeTask implements Callable<Void> {
long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
resource.getUnseqFiles());
mergeLogger = new MergeLogger(storageGroupDir);
+
mergeLogger.logFiles(resource);
List<Path> unmergedSeries = MergeUtils.collectPaths(resource);
- MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+ MergeChunkTask mergeChunkTask = new MergeChunkTask(mergedChunkCnt, unmergedChunkCnt,
unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedSeries);
- mergeSeriesTask.mergeSeries();
+ mergeChunkTask.mergeSeries();
MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
@@ -102,20 +106,18 @@ public class MergeTask implements Callable<Void> {
cleanUp(true);
if (logger.isInfoEnabled()) {
- double elapsedTime = (double) (System.currentTimeMillis() - startTime);
+ double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
double byteRate = totalFileSize / elapsedTime / 1024 / 1024 * 1000;
double seriesRate = unmergedSeries.size() / elapsedTime * 1000;
- double chunkRate = mergeSeriesTask.totalChunkWritten / elapsedTime * 1000;
+ double chunkRate = mergeChunkTask.totalChunkWritten / elapsedTime * 1000;
double fileRate =
(resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime * 1000;
- logger.info("{} ends after {}ms, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+ logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+ "fileRate: {}/s",
taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate);
}
}
-
-
void cleanUp(boolean executeCallback) throws IOException {
logger.info("{} is cleaning up", taskName);
@@ -134,12 +136,11 @@ public class MergeTask implements Callable<Void> {
File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
if (executeCallback) {
- // make sure merge.log is not deleted before unseqFiles are cleared
+ // make sure merge.log is not deleted until unseqFiles are cleared so that when system
+ // reboots, the undeleted files can be deleted again
callback.call(resource.getSeqFiles(), resource.getUnseqFiles(), logFile);
} else {
logFile.delete();
}
}
-
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 809be34..6c12c16 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -37,13 +37,16 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
+ * scanning merge.log using LogAnalyzer and continue the unfinished merge.
+ */
public class RecoverMergeTask extends MergeTask {
private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
private LogAnalyzer analyzer;
-
public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
boolean fullMerge) throws IOException {
super(null, null, storageGroupDir, callback, taskName, fullMerge);
@@ -87,10 +90,10 @@ public class RecoverMergeTask extends MergeTask {
private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
if (continueMerge) {
- resumeMerge();
- MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+ resumeMergeProgress();
+ MergeChunkTask mergeChunkTask = new MergeChunkTask(mergedChunkCnt, unmergedChunkCnt,
unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, analyzer.getUnmergedPaths());
- mergeSeriesTask.mergeSeries();
+ mergeChunkTask.mergeSeries();
MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
@@ -101,26 +104,25 @@ public class RecoverMergeTask extends MergeTask {
private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
if (continueMerge) {
- resumeMerge();
+ resumeMergeProgress();
MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
unmergedChunkStartTimes, mergeLogger, resource, analyzer.getUnmergedFiles());
mergeFileTask.mergeFiles();
} else {
- // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
- // will recover them, so they are not a concern here
+ // NOTICE: although some of the seqFiles may have been truncated in last merge, we do not
+ // recover them here because later TsFile recovery will recover them
truncateFiles();
}
cleanUp(continueMerge);
}
- private void resumeMerge() throws IOException {
+ private void resumeMergeProgress() throws IOException {
mergeLogger = new MergeLogger(storageGroupDir);
truncateFiles();
recoverChunkCounts();
}
-
- // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
+ // scan the metadata to compute how many chunks are merged/unmerged so at last we can decide to
// move the merged chunks or the unmerged chunks
private void recoverChunkCounts() throws IOException {
logger.debug("{} recovering chunk counts", taskName);
@@ -143,25 +145,25 @@ public class RecoverMergeTask extends MergeTask {
mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
v + mergeFileChunks.size());
- int seqIndex = 0;
- int mergeIndex = 0;
+ int seqChunkIndex = 0;
+ int mergeChunkIndex = 0;
int unmergedCnt = 0;
- while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
- ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
- ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
+ while (seqChunkIndex < seqFileChunks.size() && mergeChunkIndex < mergeFileChunks.size()) {
+ ChunkMetaData seqChunk = seqFileChunks.get(seqChunkIndex);
+ ChunkMetaData mergedChunk = mergeFileChunks.get(mergeChunkIndex);
if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
// this seqChunk is unmerged
unmergedCnt ++;
- seqIndex ++;
+ seqChunkIndex ++;
unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
} else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
// this seqChunk is merged
- seqIndex ++;
+ seqChunkIndex ++;
} else {
// seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
// seqChunk
- mergeIndex ++;
+ mergeChunkIndex ++;
}
}
int finalUnmergedCnt = unmergedCnt;
@@ -183,8 +185,4 @@ public class RecoverMergeTask extends MergeTask {
}
}
}
-
-
-
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 9a3a830..e844c01 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -150,4 +152,33 @@ public class MergeUtils {
}
return chunk.getHeader().getNumOfPages();
}
+
+ public static boolean fileOverlap(TsFileResource seqFile, TsFileResource unseqFile) {
+ Map<String, Long> seqStartTimes = seqFile.getStartTimeMap();
+ Map<String, Long> seqEndTimes = seqFile.getEndTimeMap();
+ Map<String, Long> unseqStartTimes = unseqFile.getStartTimeMap();
+ Map<String, Long> unseqEndTimes = unseqFile.getEndTimeMap();
+
+ for (Entry<String, Long> seqEntry : seqStartTimes.entrySet()) {
+ Long unseqStartTime = unseqStartTimes.get(seqEntry.getKey());
+ if (unseqStartTime == null) {
+ continue;
+ }
+ Long unseqEndTime = unseqEndTimes.get(seqEntry.getKey());
+ Long seqStartTime = seqEntry.getValue();
+ Long seqEndTime = seqEndTimes.get(seqEntry.getKey());
+
+ if (intervalOverlap(seqStartTime, seqEndTime, unseqStartTime, unseqEndTime)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean intervalOverlap(long l1, long r1, long l2, long r2) {
+ return (l1 <= l2 && l2 <= r1) ||
+ (l1 <= r2 && r2 <= r1) ||
+ (l2 <= l1 && l1 <= r2) ||
+ (l2 <= r1 && r1 <= r2);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
index 78e561e..e7d9c70 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
@@ -34,4 +34,8 @@ public interface ChunkLoader {
*/
void close() throws IOException;
+ /**
+ * clear Chunk cache if used.
+ */
+ void clear();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
index 4907634..ae16a97 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
@@ -69,4 +69,8 @@ public class ChunkLoaderImpl implements ChunkLoader {
reader.close();
}
+ @Override
+ public void clear() {
+ chunkCache.clear();
+ }
}