You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by sp...@apache.org on 2023/03/04 13:44:16 UTC
[iotdb] branch research/column-compaction updated: add multi-column compaction (#9214)
This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch research/column-compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/column-compaction by this push:
new 45d49dce74 add multi-column compaction (#9214)
45d49dce74 is described below
commit 45d49dce745fbedf16991f8e2295ae9ab544e232
Author: Chenguang Fang <th...@163.com>
AuthorDate: Sat Mar 4 21:44:10 2023 +0800
add multi-column compaction (#9214)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 38 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +
.../db/engine/compaction/CompactionScheduler.java | 2 +
.../sizetiered/SizeTieredCompactionSelector.java | 470 ++++++-
.../inner/sizetiered/SizeTieredCompactionTask.java | 8 -
.../apache/iotdb/db/metadata/path/AlignedPath.java | 24 +
.../apache/iotdb/db/metadata/path/PartialPath.java | 13 +
.../db/query/control/QueryResourceManager.java | 7 +
.../query/reader/chunk/DiskAlignedChunkLoader.java | 16 +
.../reader/series/AlignedSeriesBitmapReader.java | 174 +++
.../db/query/reader/series/SeriesBitmapReader.java | 1474 ++++++++++++++++++++
.../reader/series/SeriesRawDataPrefetchReader.java | 169 +++
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 17 +
.../read/reader/chunk/AlignedChunkReader.java | 129 ++
.../reader/page/AlignedPagePrefetchReader.java | 148 ++
.../read/reader/page/ValuePageBitmapReader.java | 237 ++++
16 files changed, 2914 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b195d3df55..1ac6569d55 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -374,13 +374,13 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 100000;
/** Enable inner space copaction for sequence files */
- private boolean enableSeqSpaceCompaction = true;
+ private boolean enableSeqSpaceCompaction = false;
/** Enable inner space copaction for unsequence files */
private boolean enableUnseqSpaceCompaction = true;
/** Compact the unsequence files into the overlapped sequence files */
- private boolean enableCrossSpaceCompaction = true;
+ private boolean enableCrossSpaceCompaction = false;
/**
* The strategy of inner space compaction task. There are just one inner space compaction strategy
@@ -409,6 +409,12 @@ public class IoTDBConfig {
/** The target tsfile size in compaction, 1 GB by default */
private long targetCompactionFileSize = 1073741824L;
+ private int maxCompactionLevel = 100;
+
+ private int maxFileNumInLevel = 50;
+
+ private String compactionSelectFileMethod = "mcc";
+
/** The target chunk size in compaction. */
private long targetChunkSize = 1048576L;
@@ -443,10 +449,10 @@ public class IoTDBConfig {
private long maxCrossCompactionCandidateFileSize = 1024 * 1024 * 1024 * 5L;
/** The interval of compaction task schedulation in each virtual storage group. The unit is ms. */
- private long compactionScheduleIntervalInMs = 60_000L;
+ private long compactionScheduleIntervalInMs = 1_000L;
/** The interval of compaction task submission from queue in CompactionTaskMananger */
- private long compactionSubmissionIntervalInMs = 60_000L;
+ private long compactionSubmissionIntervalInMs = 1_000L;
/**
* The number of sub compaction threads to be set up to perform compaction. Currently only works
@@ -2547,10 +2553,34 @@ public class IoTDBConfig {
return targetCompactionFileSize;
}
+ public int getMaxCompactionLevel() {
+ return maxCompactionLevel;
+ }
+
+ public int getMaxFileNumInLevel() {
+ return maxFileNumInLevel;
+ }
+
+ public String getCompactionSelectFileMethod() {
+ return compactionSelectFileMethod;
+ }
+
public void setTargetCompactionFileSize(long targetCompactionFileSize) {
this.targetCompactionFileSize = targetCompactionFileSize;
}
+ public void setMaxCompactionLevel(int maxCompactionLevel) {
+ this.maxCompactionLevel = maxCompactionLevel;
+ }
+
+ public void setMaxFileNumInLevel(int maxFileNumInLevel) {
+ this.maxFileNumInLevel = maxFileNumInLevel;
+ }
+
+ public void setCompactionSelectFileMethod(String compactionSelectFileMethod) {
+ this.compactionSelectFileMethod = compactionSelectFileMethod;
+ }
+
public long getTargetChunkSize() {
return targetChunkSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2616bbb15e..fb8142d406 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -553,6 +553,17 @@ public class IoTDBDescriptor {
properties.getProperty(
"concurrent_compaction_thread",
Integer.toString(conf.getConcurrentCompactionThread()))));
+ conf.setMaxCompactionLevel(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_compaction_level", Integer.toString(conf.getMaxCompactionLevel()))));
+ conf.setMaxFileNumInLevel(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_file_num_in_level", Integer.toString(conf.getMaxFileNumInLevel()))));
+ conf.setCompactionSelectFileMethod(
+ properties.getProperty(
+ "compaction_select_file_method", conf.getCompactionSelectFileMethod()));
conf.setTargetCompactionFileSize(
Long.parseLong(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index 62dc5f424e..0fefe484d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -46,6 +46,7 @@ public class CompactionScheduler {
if (!tsFileManager.isAllowCompaction()) {
return;
}
+
tryToSubmitCrossSpaceCompactionTask(
tsFileManager.getStorageGroupName(),
tsFileManager.getVirtualStorageGroup(),
@@ -53,6 +54,7 @@ public class CompactionScheduler {
timePartition,
tsFileManager,
new CrossSpaceCompactionTaskFactory());
+
tryToSubmitInnerSpaceCompactionTask(
tsFileManager.getStorageGroupName(),
tsFileManager.getVirtualStorageGroup(),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index ee4a45d9db..9c453ffbd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -24,22 +24,34 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
+import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataPrefetchReader;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.*;
+import java.util.stream.Collectors;
/**
* SizeTieredCompactionSelector selects files to be compacted based on the size of files. The
@@ -86,9 +98,17 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
int maxLevel = searchMaxFileLevel();
- for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
- if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
- break;
+ for (int currentLevel = 0;
+ currentLevel <= maxLevel && currentLevel <= config.getMaxCompactionLevel();
+ currentLevel++) {
+ if (this.sequence) {
+ if (!selectLevelTaskSeq(currentLevel, taskPriorityQueue)) {
+ break;
+ }
+ } else {
+ if (!selectLevelTaskUnseq(currentLevel, taskPriorityQueue)) {
+ break;
+ }
}
}
while (taskPriorityQueue.size() > 0) {
@@ -113,19 +133,235 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
* @return return whether to continue the search to higher levels
* @throws IOException
*/
- private boolean selectLevelTask(
+ private boolean selectLevelTaskSeq(
+ int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+ throws IOException {
+ boolean shouldContinueToSearch = true;
+ List<TsFileResource> selectedFileList = new ArrayList<>();
+ long selectedFileSize = 0L;
+ long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+ for (TsFileResource currentFile : tsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ if (currentName.getInnerCompactionCnt() != level
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
+ selectedFileList.clear();
+ selectedFileSize = 0L;
+ continue;
+ }
+ LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize());
+ selectedFileList.add(currentFile);
+ selectedFileSize += currentFile.getTsFileSize();
+ LOGGER.debug(
+ "Add tsfile {}, current select file num is {}, size is {}",
+ currentFile,
+ selectedFileList.size(),
+ selectedFileSize);
+ // if the file size or file num reach threshold
+ if (selectedFileSize >= targetCompactionFileSize
+ || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+ // submit the task
+ if (selectedFileList.size() > 1) {
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ }
+ selectedFileList = new ArrayList<>();
+ selectedFileSize = 0L;
+ shouldContinueToSearch = false;
+ }
+ }
+ return shouldContinueToSearch;
+ }
+
+ private boolean selectLevelTaskUnseq(
+ int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+ throws IOException {
+ if (tsFileResources.size() <= 1) {
+ return true;
+ }
+ if (config.getMaxInnerCompactionCandidateFileNum() == 1) {
+ return false;
+ }
+ String compactionSelectFileMethod = config.getCompactionSelectFileMethod();
+
+ if (compactionSelectFileMethod.equals("oldest")) {
+ return selectLevelTaskUnseqOldest(level, taskPriorityQueue);
+ } else if (compactionSelectFileMethod.equals("round")) {
+ return selectLevelTaskUnseqRound(level, taskPriorityQueue);
+ }
+
+ /* greedy */
+ long startTime = System.currentTimeMillis();
+ boolean shouldContinueToSearch = true;
+ List<TsFileResource> selectedFileList = new ArrayList<>();
+ ArrayList<Integer> selectedFileIdx = new ArrayList<>();
+ long selectedFileSize = 0L;
+ long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+ ArrayList<ArrayList<Long>> timeLists = new ArrayList<>();
+ ArrayList<ArrayList<ArrayList<Boolean>>> allBitmapLists = new ArrayList<>();
+ ArrayList<ArrayList<String>> schemaLists = new ArrayList<>();
+ ArrayList<TsFileResource> curLevelTsFileResources = new ArrayList<>();
+
+ for (TsFileResource currentFile : tsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ if (currentName.getInnerCompactionCnt() != level
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
+ continue;
+ } else {
+ if (currentFile.getTsFileSize() > 0) {
+ curLevelTsFileResources.add(currentFile);
+ }
+ }
+ }
+
+ if (curLevelTsFileResources.size() <= config.getMaxFileNumInLevel()) {
+ return shouldContinueToSearch;
+ }
+
+ for (TsFileResource currentFile : curLevelTsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ // read all files
+ if (!this.sequence) {
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<ArrayList<Boolean>> bitmapLists = new ArrayList<>();
+ ArrayList<String> schemaList = new ArrayList<>();
+ readMultiFileDataWithoutValues(currentFile, timeList, bitmapLists, schemaList);
+ timeLists.add(timeList);
+ allBitmapLists.add(bitmapLists);
+ schemaLists.add(schemaList);
+ }
+ }
+ long readFileStopTime = System.currentTimeMillis();
+ double readFileTime = (readFileStopTime - startTime) / 1000.0d;
+
+ // generate new task queue for compaction
+ // greedy add files
+ int i;
+ int maxOverlapFinal = -1;
+
+ int selectedNum = config.getMaxInnerCompactionCandidateFileNum();
+
+ for (i = 0; i < curLevelTsFileResources.size() - selectedNum; i++) {
+ ArrayList<Integer> curSelectedFileIdx = new ArrayList<>();
+ List<TsFileResource> curSelectedFileList = new ArrayList<>();
+ long curSelectedFileSize = 0L;
+ curSelectedFileList.add(curLevelTsFileResources.get(i));
+ curSelectedFileIdx.add(i);
+ curSelectedFileSize += curLevelTsFileResources.get(i).getTsFileSize();
+ int totalOverlap = 0;
+ while (curSelectedFileList.size() < timeLists.size()) {
+ int maxOverlapIdx = -1;
+ int maxOverlap = -1;
+ for (int j = curSelectedFileIdx.get(curSelectedFileList.size() - 1) + 1;
+ j < timeLists.size();
+ j++) {
+ int curOverlap = -1;
+ for (int idx : curSelectedFileIdx) {
+ int tmpOverlap =
+ computeOverlapWithBitmap(
+ timeLists.get(idx),
+ timeLists.get(j),
+ allBitmapLists.get(idx),
+ allBitmapLists.get(j),
+ schemaLists.get(idx),
+ schemaLists.get(j));
+ if (tmpOverlap > curOverlap) {
+ curOverlap = tmpOverlap;
+ }
+ }
+ if (curOverlap > maxOverlap) {
+ maxOverlap = curOverlap;
+ maxOverlapIdx = j;
+ }
+ }
+ if (maxOverlap == -1 || maxOverlapIdx == -1) {
+ break;
+ }
+ TsFileResource currentFile = curLevelTsFileResources.get(maxOverlapIdx);
+ curSelectedFileIdx.add(maxOverlapIdx);
+ curSelectedFileList.add(currentFile);
+ curSelectedFileSize += currentFile.getTsFileSize();
+ totalOverlap += maxOverlap;
+
+ if (curSelectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+ // check overlap
+ if (totalOverlap > maxOverlapFinal) {
+ maxOverlapFinal = totalOverlap;
+ selectedFileSize = curSelectedFileSize;
+ selectedFileIdx = new ArrayList<>();
+ for (int tmpIdx : curSelectedFileIdx) {
+ selectedFileIdx.add(tmpIdx);
+ }
+ selectedFileList = new ArrayList<>();
+ for (TsFileResource tsfile : curSelectedFileList) {
+ selectedFileList.add(tsfile);
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ if (selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+ // submit the task
+ if (selectedFileList.size() > 1) {
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ shouldContinueToSearch = false;
+ }
+ }
+ long submitCompactionStopTime = System.currentTimeMillis();
+ double submitCompactionTime = (submitCompactionStopTime - readFileStopTime) / 1000.0d;
+ return shouldContinueToSearch;
+ }
+
+ private boolean selectLevelTaskUnseqRound(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
throws IOException {
+ long startTime = System.currentTimeMillis();
boolean shouldContinueToSearch = true;
List<TsFileResource> selectedFileList = new ArrayList<>();
long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize();
+ int levelFileNum = tsFileResources.size();
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level
- || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
+ levelFileNum--;
+ }
+ }
+ if (levelFileNum < config.getMaxFileNumInLevel()) {
+ return shouldContinueToSearch;
+ }
+
+ ArrayList<TsFileResource> randomTsFileResources = new ArrayList<>();
+ ArrayList<Integer> idxList = new ArrayList<>();
+ for (int i = 0; i < tsFileResources.size(); ++i) {
+ idxList.add(i);
+ }
+ Collections.shuffle(idxList);
+ for (int i = 0; i < tsFileResources.size(); ++i) {
+ randomTsFileResources.add(tsFileResources.get(idxList.get(i)));
+ }
+
+ for (TsFileResource currentFile : randomTsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ if (currentName.getInnerCompactionCnt() != level
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
selectedFileList.clear();
selectedFileSize = 0L;
continue;
@@ -148,11 +384,75 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
shouldContinueToSearch = false;
+ break;
}
}
return shouldContinueToSearch;
}
+ private boolean selectLevelTaskUnseqOldest(
+ int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+ throws IOException {
+ long startTime = System.currentTimeMillis();
+ long submitCompactionStopTime = System.currentTimeMillis();
+ double submitCompactionTime = (submitCompactionStopTime - startTime) / 1000.0d;
+
+ boolean shouldContinueToSearch = true;
+ List<TsFileResource> selectedFileList = new ArrayList<>();
+ long selectedFileSize = 0L;
+ long targetCompactionFileSize = config.getTargetCompactionFileSize();
+
+ int levelFileNum = tsFileResources.size();
+ for (TsFileResource currentFile : tsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ if (currentName.getInnerCompactionCnt() != level
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
+ levelFileNum--;
+ }
+ }
+ if (levelFileNum < config.getMaxFileNumInLevel()) {
+ return shouldContinueToSearch;
+ }
+
+ for (TsFileResource currentFile : tsFileResources) {
+ TsFileNameGenerator.TsFileName currentName =
+ TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
+ if (currentName.getInnerCompactionCnt() != level
+ || currentFile.isCompactionCandidate()
+ || currentFile.isCompacting()
+ || !currentFile.isClosed()) {
+ selectedFileList.clear();
+ selectedFileSize = 0L;
+ continue;
+ }
+ LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize());
+ selectedFileList.add(currentFile);
+ selectedFileSize += currentFile.getTsFileSize();
+ LOGGER.debug(
+ "Add tsfile {}, current select file num is {}, size is {}",
+ currentFile,
+ selectedFileList.size(),
+ selectedFileSize);
+ // if the file size or file num reach threshold
+ if (selectedFileSize >= targetCompactionFileSize
+ || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+ // submit the task
+ if (selectedFileList.size() > 1) {
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ }
+ selectedFileList = new ArrayList<>();
+ selectedFileSize = 0L;
+ shouldContinueToSearch = false;
+ break;
+ }
+ }
+
+ return shouldContinueToSearch;
+ }
+
private int searchMaxFileLevel() throws IOException {
int maxLevel = -1;
Iterator<TsFileResource> iterator = tsFileResources.iterator();
@@ -180,6 +480,154 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
return CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
}
+ private void readMultiFileDataWithoutValues(
+ TsFileResource tsFileResource,
+ ArrayList<Long> timeList,
+ ArrayList<ArrayList<Boolean>> bitmapLists,
+ ArrayList<String> schemaList) {
+ try {
+ List<TsFileResource> unseqFileResources = new ArrayList<>();
+ unseqFileResources.add(tsFileResource);
+ long queryId = QueryResourceManager.getInstance().assignCompactionPrefetchQueryId();
+ QueryContext queryContext = new QueryContext(queryId);
+ QueryDataSource queryDataSource = new QueryDataSource(new ArrayList<>(), unseqFileResources);
+ QueryResourceManager.getInstance()
+ .getQueryFileManager()
+ .addUsedFilesForQuery(queryId, queryDataSource);
+ MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(unseqFileResources);
+
+ while (deviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
+ String device = deviceInfo.left;
+ boolean isAligned = deviceInfo.right;
+ QueryUtils.fillOrderIndexes(queryDataSource, device, true);
+
+ if (isAligned) {
+ Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice();
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values());
+ if (measurementSchemas.isEmpty()) {
+ return;
+ }
+ List<String> existedMeasurements =
+ measurementSchemas.stream()
+ .map(IMeasurementSchema::getMeasurementId)
+ .collect(Collectors.toList());
+ for (String schema : existedMeasurements) {
+ schemaList.add(schema);
+ }
+ IBatchReader dataBatchReader =
+ // constructReader(
+ constructPrefetchReader(
+ device,
+ existedMeasurements,
+ measurementSchemas,
+ schemaMap.keySet(),
+ queryContext,
+ queryDataSource,
+ true);
+
+ for (int i = 0; i < existedMeasurements.size(); i++) {
+ bitmapLists.add(new ArrayList<>());
+ }
+
+ if (dataBatchReader.hasNextBatch()) {
+ while (dataBatchReader.hasNextBatch()) {
+ BatchData batchData = dataBatchReader.nextBatch();
+ while (batchData.hasCurrent()) {
+ long time = batchData.currentTime();
+ timeList.add(time);
+ TsPrimitiveType[] value = (TsPrimitiveType[]) batchData.currentValue();
+ for (int j = 0; j < value.length; j++) {
+ if (value[j] == null) {
+ bitmapLists.get(j).add(false);
+ } else {
+ bitmapLists.get(j).add(true);
+ }
+ }
+ batchData.next();
+ }
+ }
+ }
+ }
+ tsFileResource.readUnlock();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception occurs while reading multiple file data", e);
+ }
+ }
+
+ private int computeOverlapWithBitmap(
+ ArrayList<Long> timeList1,
+ ArrayList<Long> timeList2,
+ ArrayList<ArrayList<Boolean>> bitmap1,
+ ArrayList<ArrayList<Boolean>> bitmap2,
+ ArrayList<String> schemaList1,
+ ArrayList<String> schemaList2) {
+ int p1 = 0;
+ int p2 = 0;
+ int overlap = 0;
+ ArrayList<Pair<Integer, Integer>> schemaPairs = new ArrayList<>();
+ for (int i = 0; i < schemaList1.size(); i++) {
+ for (int j = 0; j < schemaList2.size(); j++) {
+ if (schemaList1.get(i).equals(schemaList2.get(j))) {
+ schemaPairs.add(new Pair<>(i, j));
+ }
+ }
+ }
+ int overlapOfRow = schemaPairs.size();
+ int updateCheck = -1;
+ while (p1 < timeList1.size() && p2 < timeList2.size()) {
+ if (timeList1.get(p1) < timeList2.get(p2)) {
+ p1++;
+ } else {
+ if (timeList1.get(p1) > timeList2.get(p2)) {
+ p2++;
+ } else {
+ if (updateCheck == 0) {
+ // insert
+ overlap += 1;
+ } else if (updateCheck == 1) {
+ // update
+ overlap += overlapOfRow + 1;
+ } else {
+ Pair<Integer, Integer> schemaPair = schemaPairs.get(0);
+ if (bitmap1.get(schemaPair.left).get(p1) && bitmap2.get(schemaPair.right).get(p2)) {
+ updateCheck = 1;
+ } else {
+ updateCheck = 0;
+ }
+ }
+
+ p1++;
+ p2++;
+ }
+ }
+ }
+ return overlap;
+ }
+
+ public static IBatchReader constructPrefetchReader(
+ String deviceId,
+ List<String> measurementIds,
+ List<IMeasurementSchema> measurementSchemas,
+ Set<String> allSensors,
+ QueryContext queryContext,
+ QueryDataSource queryDataSource,
+ boolean isAlign)
+ throws IllegalPathException {
+ PartialPath seriesPath;
+ TSDataType tsDataType;
+ if (isAlign) {
+ seriesPath = new AlignedPath(deviceId, measurementIds, measurementSchemas);
+ tsDataType = TSDataType.VECTOR;
+ } else {
+ seriesPath = new MeasurementPath(deviceId, measurementIds.get(0), measurementSchemas.get(0));
+ tsDataType = measurementSchemas.get(0).getType();
+ }
+ return new SeriesRawDataPrefetchReader(
+ seriesPath, allSensors, tsDataType, queryContext, queryDataSource, null, null, null, true);
+ }
+
private class SizeTieredCompactionTaskComparator
implements Comparator<Pair<List<TsFileResource>, Long>> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index aba0a9adec..87fce04c99 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -90,7 +90,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
long startTime = System.currentTimeMillis();
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
- // Here is tmpTargetFile, which is xxx.target
TsFileResource targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
selectedTsFileResourceList, sequence);
@@ -196,13 +195,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
FileUtils.delete(logFile);
}
- if (targetTsFileResource.isDeleted()) {
- // target resource is empty after compaction, then delete it
- targetTsFileResource.remove();
- } else {
- // set target resource to CLOSED, so that it can be selected to compact
- targetTsFileResource.setStatus(TsFileResourceStatus.CLOSED);
- }
} catch (Throwable throwable) {
LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
if (sizeTieredCompactionLogger != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
index 9ae5b7db1e..51fa6bca97 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java
@@ -37,7 +37,9 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.AlignedLastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesBitmapReader;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesReader;
+import org.apache.iotdb.db.query.reader.series.SeriesBitmapReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -281,6 +283,28 @@ public class AlignedPath extends PartialPath {
ascending);
}
+ @Override
+ public SeriesBitmapReader createSeriesBitmapReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ return new AlignedSeriesBitmapReader(
+ this,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
@Override
@TestOnly
public AlignedSeriesReader createSeriesReader(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
index b55cacd6fc..9cce9e516b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.LastPointReader;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesBitmapReader;
import org.apache.iotdb.db.query.reader.series.SeriesReader;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -504,6 +505,18 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
throw new UnsupportedOperationException("Should call exact sub class!");
}
+ public SeriesBitmapReader createSeriesBitmapReader(
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ throw new UnsupportedOperationException("Should call exact sub class!");
+ }
+
@TestOnly
public SeriesReader createSeriesReader(
Set<String> allSensors,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index a5ecb90dbc..be9dc914b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -103,6 +103,13 @@ public class QueryResourceManager {
return queryId;
}
+ public long assignCompactionPrefetchQueryId() {
+ long threadNum = 10;
+ long queryId = Long.MIN_VALUE + threadNum;
+ filePathsManager.addQueryId(queryId);
+ return queryId;
+ }
+
/**
* register temporary file generated by external sort for resource release.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
index 3487682a2e..d8ae569588 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java
@@ -64,4 +64,20 @@ public class DiskAlignedChunkLoader implements IChunkLoader {
}
return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter);
}
+
+ public IChunkReader getChunkPrefetchReader(IChunkMetadata chunkMetaData, Filter timeFilter)
+ throws IOException {
+ AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData;
+ Chunk timeChunk =
+ ChunkCache.getInstance()
+ .get((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(), debug);
+ List<Chunk> valueChunkList = new ArrayList<>();
+ for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
+ valueChunkList.add(
+ valueChunkMetadata == null
+ ? null
+ : ChunkCache.getInstance().get((ChunkMetadata) valueChunkMetadata, debug));
+ }
+ return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter, true);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java
new file mode 100644
index 0000000000..4cdf3dcb73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesBitmapReader.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.AlignedDescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.AlignedPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public class AlignedSeriesBitmapReader extends SeriesBitmapReader {
+
+ public AlignedSeriesBitmapReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ super(
+ seriesPath,
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ @TestOnly
+ public AlignedSeriesBitmapReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ super(
+ seriesPath,
+ allSensors,
+ dataType,
+ context,
+ seqFileResource,
+ unseqFileResource,
+ timeFilter,
+ valueFilter,
+ ascending);
+ }
+
+ @Override
+ protected PriorityMergeReader getPriorityMergeReader() {
+ return new AlignedPriorityMergeReader();
+ }
+
+ @Override
+ protected DescPriorityMergeReader getDescPriorityMergeReader() {
+ return new AlignedDescPriorityMergeReader();
+ }
+
+ @Override
+ protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, (AlignedPath) seriesPath, context, filter);
+ }
+
+ @Override
+ /**
+ * This method should be called after hasNextChunk() until no next page, make sure that all
+ * overlapped pages are consumed
+ */
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ boolean hasNextPage() throws IOException {
+ if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+ return false;
+ }
+
+ /*
+ * has overlapped data before
+ */
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+ if (hasNextOverlappedPage()) {
+ cachedBatchData = nextOverlappedPage();
+ if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+ hasCachedNextOverlappedPage = true;
+ return true;
+ }
+ }
+ }
+
+ if (firstPageReader != null) {
+ return true;
+ }
+
+ /*
+ * construct first page reader
+ */
+ if (firstChunkMetadata != null) {
+ /*
+ * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+ */
+ unpackAllOverlappedChunkMetadataToPageReaders(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+ } else {
+ /*
+ * first chunk metadata is already unpacked, consume cached pages
+ */
+ initFirstPageReader();
+ }
+
+ if (isExistOverlappedPage()) {
+ return true;
+ }
+
+ // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+ // readers
+ while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+ initFirstPageReader();
+
+ if (isExistOverlappedPage()) {
+ return true;
+ }
+ }
+ return firstPageReader != null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java
new file mode 100644
index 0000000000..892f0ce23f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesBitmapReader.java
@@ -0,0 +1,1474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.tracing.TracingManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
+import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+
+public class SeriesBitmapReader {
+
+ // inner class of SeriesReader for order purpose
+ protected TimeOrderUtils orderUtils;
+
+ protected final PartialPath seriesPath;
+
+ // all the sensors in this device;
+ protected final Set<String> allSensors;
+ protected final TSDataType dataType;
+ protected final QueryContext context;
+
+ /*
+ * There is at most one is not null between timeFilter and valueFilter
+ *
+ * timeFilter is pushed down to all pages (seq, unseq) without correctness problem
+ *
+ * valueFilter is pushed down to non-overlapped page only
+ */
+ protected final Filter timeFilter;
+ protected final Filter valueFilter;
+ protected final TsFileFilter fileFilter;
+
+ protected final QueryDataSource dataSource;
+
+ /*
+ * file index
+ */
+ protected int curSeqFileIndex;
+ protected int curUnseqFileIndex;
+
+ /*
+ * TimeSeriesMetadata cache
+ */
+ protected ITimeSeriesMetadata firstTimeSeriesMetadata;
+ protected final List<ITimeSeriesMetadata> seqTimeSeriesMetadata = new LinkedList<>();
+ protected final PriorityQueue<ITimeSeriesMetadata> unSeqTimeSeriesMetadata;
+
+ /*
+ * chunk cache
+ */
+ protected IChunkMetadata firstChunkMetadata;
+ protected final PriorityQueue<IChunkMetadata> cachedChunkMetadata;
+
+ /*
+ * page cache
+ */
+ protected VersionPageReader firstPageReader;
+ protected final List<VersionPageReader> seqPageReaders = new LinkedList<>();
+ protected final PriorityQueue<VersionPageReader> unSeqPageReaders;
+
+ /*
+ * point cache
+ */
+ protected final PriorityMergeReader mergeReader;
+
+ /*
+ * result cache
+ */
+ protected boolean hasCachedNextOverlappedPage;
+ protected BatchData cachedBatchData;
+
+ /**
+ * @param seriesPath For querying aligned series, the seriesPath should be AlignedPath. All
+ * selected series belonging to one aligned device should be all in this one AlignedPath's
+ * measurementList.
+ * @param allSensors For querying aligned series, allSensors are not used.
+ */
+ public SeriesBitmapReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ this.seriesPath = IDTable.translateQueryPath(seriesPath);
+ this.allSensors = allSensors;
+ this.dataType = dataType;
+ this.context = context;
+ this.dataSource = dataSource;
+ this.timeFilter = timeFilter;
+ this.valueFilter = valueFilter;
+ this.fileFilter = fileFilter;
+ if (ascending) {
+ this.orderUtils = new AscTimeOrderUtils();
+ mergeReader = getPriorityMergeReader();
+ this.curSeqFileIndex = 0;
+ this.curUnseqFileIndex = 0;
+ } else {
+ this.orderUtils = new DescTimeOrderUtils();
+ mergeReader = getDescPriorityMergeReader();
+ this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+ this.curUnseqFileIndex = 0;
+ }
+
+ unSeqTimeSeriesMetadata =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
+ cachedChunkMetadata =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
+ unSeqPageReaders =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+ }
+
+ @TestOnly
+ @SuppressWarnings("squid:S107")
+ public SeriesBitmapReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ List<TsFileResource> seqFileResource,
+ List<TsFileResource> unseqFileResource,
+ Filter timeFilter,
+ Filter valueFilter,
+ boolean ascending) {
+ this.seriesPath = IDTable.translateQueryPath(seriesPath);
+ this.allSensors = allSensors;
+ this.dataType = dataType;
+ this.context = context;
+ this.dataSource = new QueryDataSource(seqFileResource, unseqFileResource);
+ QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), ascending);
+ this.timeFilter = timeFilter;
+ this.valueFilter = valueFilter;
+ this.fileFilter = null;
+ if (ascending) {
+ this.orderUtils = new AscTimeOrderUtils();
+ mergeReader = getPriorityMergeReader();
+ this.curSeqFileIndex = 0;
+ this.curUnseqFileIndex = 0;
+ } else {
+ this.orderUtils = new DescTimeOrderUtils();
+ mergeReader = getDescPriorityMergeReader();
+ this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+ this.curUnseqFileIndex = 0;
+ }
+
+ unSeqTimeSeriesMetadata =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
+ cachedChunkMetadata =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
+ unSeqPageReaders =
+ new PriorityQueue<>(
+ orderUtils.comparingLong(
+ versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
+ }
+
+ protected PriorityMergeReader getPriorityMergeReader() {
+ return new PriorityMergeReader();
+ }
+
+ protected DescPriorityMergeReader getDescPriorityMergeReader() {
+ return new DescPriorityMergeReader();
+ }
+
+ public boolean isEmpty() throws IOException {
+ return !(hasNextPage() || hasNextChunk() || hasNextFile());
+ }
+
+ boolean hasNextFile() throws IOException {
+ if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+ return false;
+ }
+
+ if (!unSeqPageReaders.isEmpty()
+ || firstPageReader != null
+ || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException(
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ + unSeqPageReaders.isEmpty()
+ + " firstPageReader != null is "
+ + (firstPageReader != null)
+ + " mergeReader.hasNextTimeValuePair() = "
+ + mergeReader.hasNextTimeValuePair());
+ }
+
+ if (firstChunkMetadata != null || !cachedChunkMetadata.isEmpty()) {
+ throw new IOException("all cached chunks should be consumed first");
+ }
+
+ if (firstTimeSeriesMetadata != null) {
+ return true;
+ }
+
+ while (firstTimeSeriesMetadata == null
+ && (orderUtils.hasNextSeqResource()
+ || orderUtils.hasNextUnseqResource()
+ || !seqTimeSeriesMetadata.isEmpty()
+ || !unSeqTimeSeriesMetadata.isEmpty())) {
+ // init first time series metadata whose startTime is minimum
+ tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
+ }
+
+ return firstTimeSeriesMetadata != null;
+ }
+
+ boolean isFileOverlapped() throws IOException {
+ if (firstTimeSeriesMetadata == null) {
+ throw new IOException("no first file");
+ }
+
+ Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics();
+ return !seqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(fileStatistics, seqTimeSeriesMetadata.get(0).getStatistics())
+ || !unSeqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(
+ fileStatistics, unSeqTimeSeriesMetadata.peek().getStatistics());
+ }
+
+ Statistics currentFileStatistics() {
+ return firstTimeSeriesMetadata.getStatistics();
+ }
+
+ Statistics currentFileStatistics(int index) throws IOException {
+ if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
+ throw new IOException("Can only get statistics by index from alignedTimeSeriesMetaData");
+ }
+ return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index);
+ }
+
+ public Statistics currentFileTimeStatistics() throws IOException {
+ if (!(firstTimeSeriesMetadata instanceof AlignedTimeSeriesMetadata)) {
+ throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
+ }
+ return ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics();
+ }
+
+ boolean currentFileModified() throws IOException {
+ if (firstTimeSeriesMetadata == null) {
+ throw new IOException("no first file");
+ }
+ return firstTimeSeriesMetadata.isModified();
+ }
+
+ void skipCurrentFile() {
+ firstTimeSeriesMetadata = null;
+ }
+
+ /**
+ * This method should be called after hasNextFile() until no next chunk, make sure that all
+ * overlapped chunks are consumed
+ */
+ boolean hasNextChunk() throws IOException {
+ if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+ return false;
+ }
+
+ if (!unSeqPageReaders.isEmpty()
+ || firstPageReader != null
+ || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException(
+ "all cached pages should be consumed first unSeqPageReaders.isEmpty() is "
+ + unSeqPageReaders.isEmpty()
+ + " firstPageReader != null is "
+ + (firstPageReader != null)
+ + " mergeReader.hasNextTimeValuePair() = "
+ + mergeReader.hasNextTimeValuePair());
+ }
+
+ if (firstChunkMetadata != null) {
+ return true;
+ }
+
+ while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
+ initFirstChunkMetadata();
+ }
+ return firstChunkMetadata != null;
+ }
+
+ /** construct first chunk metadata */
+ private void initFirstChunkMetadata() throws IOException {
+ if (firstTimeSeriesMetadata != null) {
+ /*
+ * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata
+ */
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+ orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()));
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true);
+ } else {
+ /*
+ * first time series metadata is already unpacked, consume cached ChunkMetadata
+ */
+ while (!cachedChunkMetadata.isEmpty()) {
+ firstChunkMetadata = cachedChunkMetadata.peek();
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+ if (firstChunkMetadata.equals(cachedChunkMetadata.peek())) {
+ firstChunkMetadata = cachedChunkMetadata.poll();
+ break;
+ }
+ }
+ }
+ if (valueFilter != null
+ && firstChunkMetadata != null
+ && !isChunkOverlapped()
+ && !firstChunkMetadata.isModified()
+ && !valueFilter.satisfy(firstChunkMetadata.getStatistics())) {
+ skipCurrentChunk();
+ }
+ }
+
+ private void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ long endpointTime, boolean init) throws IOException {
+ while (!seqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) {
+ unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0));
+ }
+ while (!unSeqTimeSeriesMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll());
+ }
+
+ if (firstTimeSeriesMetadata != null
+ && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) {
+ unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata);
+ firstTimeSeriesMetadata = null;
+ }
+
+ if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) {
+ firstChunkMetadata = cachedChunkMetadata.poll();
+ }
+ }
+
+ protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata)
+ throws IOException {
+ List<IChunkMetadata> chunkMetadataList =
+ FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
+ chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
+
+ // for tracing: try to calculate the number of chunk and time-value points in chunk
+ if (context.isEnableTracing()) {
+ long totalChunkPointsNum =
+ chunkMetadataList.stream()
+ .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount())
+ .sum();
+ TracingManager.getInstance()
+ .addChunkInfo(
+ context.getQueryId(),
+ chunkMetadataList.size(),
+ totalChunkPointsNum,
+ timeSeriesMetadata.isSeq());
+ }
+
+ cachedChunkMetadata.addAll(chunkMetadataList);
+ }
+
+ boolean isChunkOverlapped() throws IOException {
+ if (firstChunkMetadata == null) {
+ throw new IOException("no first chunk");
+ }
+
+ Statistics chunkStatistics = firstChunkMetadata.getStatistics();
+ return !cachedChunkMetadata.isEmpty()
+ && orderUtils.isOverlapped(chunkStatistics, cachedChunkMetadata.peek().getStatistics());
+ }
+
+ Statistics currentChunkStatistics() {
+ return firstChunkMetadata.getStatistics();
+ }
+
+ Statistics currentChunkStatistics(int index) throws IOException {
+ if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
+ throw new IOException("Can only get statistics by index from vectorChunkMetaData");
+ }
+ return ((AlignedChunkMetadata) firstChunkMetadata).getStatistics(index);
+ }
+
+ Statistics currentChunkTimeStatistics() throws IOException {
+ if (!(firstChunkMetadata instanceof AlignedChunkMetadata)) {
+ throw new IOException("Can only get statistics of time column from alignedChunkMetaData");
+ }
+ return ((AlignedChunkMetadata) firstChunkMetadata).getTimeStatistics();
+ }
+
+ boolean currentChunkModified() throws IOException {
+ if (firstChunkMetadata == null) {
+ throw new IOException("no first chunk");
+ }
+ return firstChunkMetadata.isModified();
+ }
+
+ void skipCurrentChunk() {
+ firstChunkMetadata = null;
+ }
+
+ /**
+ * This method should be called after hasNextChunk() until no next page, make sure that all
+ * overlapped pages are consumed
+ */
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ boolean hasNextPage() throws IOException {
+ if (!QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+ return false;
+ }
+
+ /*
+ * has overlapped data before
+ */
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
+ if (hasNextOverlappedPage()) {
+ cachedBatchData = nextOverlappedPage();
+ if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+ hasCachedNextOverlappedPage = true;
+ return true;
+ }
+ }
+ }
+
+ if (firstPageReader != null) {
+ return true;
+ }
+
+ /*
+ * construct first page reader
+ */
+ if (firstChunkMetadata != null) {
+ /*
+ * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+ */
+ unpackAllOverlappedChunkMetadataToPageReaders(
+ orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true);
+ } else {
+ /*
+ * first chunk metadata is already unpacked, consume cached pages
+ */
+ initFirstPageReader();
+ }
+
+ if (isExistOverlappedPage()) {
+ return true;
+ }
+
+ // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page
+ // readers
+ while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+ initFirstPageReader();
+
+ if (isExistOverlappedPage()) {
+ return true;
+ }
+ }
+ return firstPageReader != null;
+ }
+
+ public boolean isExistOverlappedPage() throws IOException {
+ if (firstPageOverlapped()) {
+ /*
+ * next page is overlapped, read overlapped data and cache it
+ */
+ if (hasNextOverlappedPage()) {
+ cachedBatchData = nextOverlappedPage();
+ if (cachedBatchData != null && cachedBatchData.hasCurrent()) {
+ hasCachedNextOverlappedPage = true;
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public boolean firstPageOverlapped() throws IOException {
+ if (firstPageReader == null) {
+ return false;
+ }
+
+ long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
+ unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
+
+ return (!seqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(
+ firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics()))
+ || (!unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(
+ firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics())
+ || (mergeReader.hasNextTimeValuePair()
+ && orderUtils.isOverlapped(
+ mergeReader.currentTimeValuePair().getTimestamp(),
+ firstPageReader.getStatistics())));
+ }
+
+ public void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init)
+ throws IOException {
+ if (firstChunkMetadata != null
+ && orderUtils.isOverlapped(endpointTime, firstChunkMetadata.getStatistics())) {
+ unpackOneChunkMetaData(firstChunkMetadata);
+ firstChunkMetadata = null;
+ }
+ // In case unpacking too many sequence chunks
+ boolean hasMeetSeq = false;
+ while (!cachedChunkMetadata.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.peek().getStatistics())) {
+ if (cachedChunkMetadata.peek().isSeq() && hasMeetSeq) {
+ break;
+ } else if (cachedChunkMetadata.peek().isSeq()) {
+ hasMeetSeq = true;
+ }
+ unpackOneChunkMetaData(cachedChunkMetadata.poll());
+ }
+ if (init
+ && firstPageReader == null
+ && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+ initFirstPageReader();
+ }
+ }
+
+ private void unpackOneChunkMetaData(IChunkMetadata chunkMetaData) throws IOException {
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPagePrefetchReaderList(chunkMetaData, timeFilter);
+ // FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
+
+ // for tracing: try to calculate the number of pages
+ if (context.isEnableTracing()) {
+ addTotalPageNumInTracing(context.getQueryId(), pageReaderList.size());
+ }
+
+ if (chunkMetaData.isSeq()) {
+ if (orderUtils.getAscending()) {
+ for (IPageReader iPageReader : pageReaderList) {
+ seqPageReaders.add(
+ new VersionPageReader(
+ chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(),
+ iPageReader,
+ true));
+ }
+ } else {
+ for (int i = pageReaderList.size() - 1; i >= 0; i--) {
+ seqPageReaders.add(
+ new VersionPageReader(
+ chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(),
+ pageReaderList.get(i),
+ true));
+ }
+ }
+ } else {
+ pageReaderList.forEach(
+ pageReader ->
+ unSeqPageReaders.add(
+ new VersionPageReader(
+ chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(),
+ pageReader,
+ false)));
+ }
+ }
+
+ private void addTotalPageNumInTracing(long queryId, int pageNum) {
+ TracingManager.getInstance().addTotalPageNum(queryId, pageNum);
+ }
+
+ /**
+ * This method should be called after calling hasNextPage.
+ *
+ * <p>hasNextPage may cache firstPageReader if it is not overlapped or cached a BatchData if the
+ * first page is overlapped
+ */
+ boolean isPageOverlapped() throws IOException {
+
+ /*
+ * has an overlapped page
+ */
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ }
+
+ /*
+ * has a non-overlapped page in firstPageReader
+ */
+ if (mergeReader.hasNextTimeValuePair()
+ && ((orderUtils.getAscending()
+ && mergeReader.currentTimeValuePair().getTimestamp()
+ <= firstPageReader.getStatistics().getEndTime())
+ || (!orderUtils.getAscending()
+ && mergeReader.currentTimeValuePair().getTimestamp()
+ >= firstPageReader.getStatistics().getStartTime()))) {
+ throw new IOException("overlapped data should be consumed first");
+ }
+
+ Statistics firstPageStatistics = firstPageReader.getStatistics();
+
+ return !unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics());
+ }
+
+ Statistics currentPageStatistics() {
+ if (firstPageReader == null) {
+ return null;
+ }
+ return firstPageReader.getStatistics();
+ }
+
+ Statistics currentPageStatistics(int index) throws IOException {
+ if (firstPageReader == null) {
+ return null;
+ }
+ if (!(firstPageReader.isAlignedPageReader())) {
+ throw new IOException("Can only get statistics by index from AlignedPageReader");
+ }
+ return firstPageReader.getStatistics(index);
+ }
+
+ Statistics currentPageTimeStatistics() throws IOException {
+ if (firstPageReader == null) {
+ return null;
+ }
+ if (!(firstPageReader.isAlignedPageReader())) {
+ throw new IOException("Can only get statistics of time column from AlignedPageReader");
+ }
+ return firstPageReader.getTimeStatistics();
+ }
+
+ boolean currentPageModified() throws IOException {
+ if (firstPageReader == null) {
+ throw new IOException("no first page");
+ }
+ return firstPageReader.isModified();
+ }
+
+ void skipCurrentPage() {
+ firstPageReader = null;
+ }
+
+ /** This method should only be used when the method isPageOverlapped() return true. */
+ BatchData nextPage() throws IOException {
+
+ if (!hasNextPage() && QueryTimeManager.checkQueryAlive(context.getQueryId())) {
+ throw new IOException("no next page, neither non-overlapped nor overlapped");
+ }
+
+ if (hasCachedNextOverlappedPage) {
+ hasCachedNextOverlappedPage = false;
+ return cachedBatchData;
+ } else {
+
+ /*
+ * next page is not overlapped, push down value filter if it exists
+ */
+ if (valueFilter != null) {
+ firstPageReader.setFilter(valueFilter);
+ }
+ BatchData batchData = firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
+ firstPageReader = null;
+
+ return batchData;
+ }
+ }
+
+ /**
+ * read overlapped data till currentLargestEndTime in mergeReader, if current batch does not
+ * contain data, read till next currentLargestEndTime again
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ public boolean hasNextOverlappedPage() throws IOException {
+
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ }
+
+ tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
+
+ while (true) {
+
+ // may has overlapped data
+ if (mergeReader.hasNextTimeValuePair()) {
+
+ cachedBatchData =
+ BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true);
+ long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+ while (mergeReader.hasNextTimeValuePair()) {
+
+ /*
+ * get current first point in mergeReader, this maybe overlapped later
+ */
+ TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
+ if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
+ /*
+ * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime
+ * 1. has cached batch data, we don't need to read more data, just use the cached data later
+ * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+ * we could just use the first page reader later
+ * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(),
+ * we could use the first sequence page reader later
+ */
+ if (cachedBatchData.hasCurrent()
+ || firstPageReader != null
+ || !seqPageReaders.isEmpty()) {
+ break;
+ }
+ // so, we don't have other data except mergeReader
+ currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
+ }
+
+ // unpack all overlapped data for the first timeValuePair
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
+ timeValuePair.getTimestamp(), false);
+ unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
+ unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+ // update if there are unpacked unSeqPageReaders
+ timeValuePair = mergeReader.currentTimeValuePair();
+
+ // from now, the unsequence reader is all unpacked, so we don't need to consider it
+ // we has first page reader now
+ if (firstPageReader != null) {
+ // if current timeValuePair excesses the first page reader's end time, we just use the
+ // cached data
+ if ((orderUtils.getAscending()
+ && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime())
+ || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ < firstPageReader.getStatistics().getStartTime())) {
+ cachedBatchData.flip();
+ hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+ return hasCachedNextOverlappedPage;
+ } else if (orderUtils.isOverlapped(
+ timeValuePair.getTimestamp(), firstPageReader.getStatistics())) {
+ // current timeValuePair is overlapped with firstPageReader, add it to merged reader
+ // and update endTime to the max end time
+ mergeReader.addReader(
+ firstPageReader
+ .getAllSatisfiedPageData(orderUtils.getAscending())
+ .getBatchDataIterator(),
+ firstPageReader.version,
+ orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
+ context);
+ currentPageEndPointTime =
+ updateEndPointTime(currentPageEndPointTime, firstPageReader);
+ firstPageReader = null;
+ }
+ }
+
+ // the seq page readers is not empty, just like first page reader
+ if (!seqPageReaders.isEmpty()) {
+ if ((orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ > seqPageReaders.get(0).getStatistics().getEndTime())
+ || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp()
+ < seqPageReaders.get(0).getStatistics().getStartTime())) {
+ cachedBatchData.flip();
+ hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+ return hasCachedNextOverlappedPage;
+ } else if (orderUtils.isOverlapped(
+ timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) {
+ VersionPageReader pageReader = seqPageReaders.remove(0);
+ mergeReader.addReader(
+ pageReader
+ .getAllSatisfiedPageData(orderUtils.getAscending())
+ .getBatchDataIterator(),
+ pageReader.version,
+ orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+ context);
+ currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader);
+ }
+ }
+
+ /*
+ * get the latest first point in mergeReader
+ */
+ timeValuePair = mergeReader.nextTimeValuePair();
+
+ Object valueForFilter = timeValuePair.getValue().getValue();
+
+ // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
+ // only accept AlignedPath with only one sub sensor
+ if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+ for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+ if (tsPrimitiveType != null) {
+ valueForFilter = tsPrimitiveType.getValue();
+ break;
+ }
+ }
+ }
+
+ if (valueFilter == null
+ || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+ cachedBatchData.putAnObject(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+ }
+ }
+ cachedBatchData.flip();
+ hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+ /*
+ * if current overlapped page has valid data, return, otherwise read next overlapped page
+ */
+ if (hasCachedNextOverlappedPage) {
+ return true;
+ } else if (mergeReader.hasNextTimeValuePair()) {
+ // condition: seqPage.endTime < mergeReader.currentTime
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private long updateEndPointTime(long currentPageEndPointTime, VersionPageReader pageReader) {
+ if (orderUtils.getAscending()) {
+ return Math.min(currentPageEndPointTime, pageReader.getStatistics().getEndTime());
+ } else {
+ return Math.max(currentPageEndPointTime, pageReader.getStatistics().getStartTime());
+ }
+ }
+
+ private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
+
+ /*
+ * no cached page readers
+ */
+ if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
+ return;
+ }
+
+ /*
+ * init firstPageReader
+ */
+ if (firstPageReader == null) {
+ initFirstPageReader();
+ }
+
+ long currentPageEndpointTime;
+ if (mergeReader.hasNextTimeValuePair()) {
+ currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
+ } else {
+ currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+ }
+
+ /*
+ * put all currently directly overlapped unseq page reader to merge reader
+ */
+ unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
+ }
+
+ public void initFirstPageReader() throws IOException {
+ while (this.firstPageReader == null) {
+ VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders();
+
+ // unpack overlapped page using current page reader
+ if (firstPageReader != null) {
+ long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
+ unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+
+ // this page after unpacking must be the first page
+ if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
+ this.firstPageReader = firstPageReader;
+ if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) {
+ seqPageReaders.remove(0);
+ break;
+ } else if (!unSeqPageReaders.isEmpty()
+ && firstPageReader.equals(unSeqPageReaders.peek())) {
+ unSeqPageReaders.poll();
+ break;
+ }
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
+ // We use get() and peek() here in case it's not the first page reader before unpacking
+ private VersionPageReader getFirstPageReaderFromCachedReaders() {
+ VersionPageReader firstPageReader = null;
+ if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
+ if (orderUtils.isTakeSeqAsFirst(
+ seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) {
+ firstPageReader = seqPageReaders.get(0);
+ } else {
+ firstPageReader = unSeqPageReaders.peek();
+ }
+ } else if (!seqPageReaders.isEmpty()) {
+ firstPageReader = seqPageReaders.get(0);
+ } else if (!unSeqPageReaders.isEmpty()) {
+ firstPageReader = unSeqPageReaders.peek();
+ }
+ return firstPageReader;
+ }
+
+ private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
+ throws IOException {
+ while (!unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) {
+ putPageReaderToMergeReader(unSeqPageReaders.poll());
+ }
+ if (firstPageReader != null
+ && !firstPageReader.isSeq()
+ && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
+ putPageReaderToMergeReader(firstPageReader);
+ firstPageReader = null;
+ }
+ }
+
+ private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
+ mergeReader.addReader(
+ pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(),
+ pageReader.version,
+ orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
+ context);
+ }
+
+ public BatchData nextOverlappedPage() throws IOException {
+ if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) {
+ hasCachedNextOverlappedPage = false;
+ return cachedBatchData;
+ }
+ throw new IOException("No more batch data");
+ }
+
+ private LinkedList<TsFileResource> sortUnSeqFileResources(List<TsFileResource> tsFileResources) {
+ return tsFileResources.stream()
+ .sorted(orderUtils.comparingLong(tsFileResource -> orderUtils.getOrderTime(tsFileResource)))
+ .collect(Collectors.toCollection(LinkedList::new));
+ }
+
+ /**
+ * unpack all overlapped seq/unseq files and find the first TimeSeriesMetadata
+ *
+ * <p>Because there may be too many files in the scenario used by the user, we cannot open all the
+ * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
+ * approach is likely to be ubiquitous, but it keeps the system running smoothly
+ */
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ protected void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOException {
+ /*
+ * Fill sequence TimeSeriesMetadata List until it is not empty
+ */
+ while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
+ unpackSeqTsFileResource();
+ }
+
+ /*
+ * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
+ */
+ while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
+ unpackUnseqTsFileResource();
+ }
+
+ /*
+ * find end time of the first TimeSeriesMetadata
+ */
+ long endTime = -1L;
+ if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has seq
+ endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
+ } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has unseq
+ endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
+ } else if (!seqTimeSeriesMetadata.isEmpty()) {
+ // has seq and unseq
+ endTime =
+ orderUtils.getCurrentEndPoint(
+ seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics());
+ }
+
+ /*
+ * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata
+ */
+ if (endTime != -1) {
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime);
+ }
+
+ /*
+ * update the first TimeSeriesMetadata
+ */
+ if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has seq
+ firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
+ } else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
+ // only has unseq
+ firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+ } else if (!seqTimeSeriesMetadata.isEmpty()) {
+ // has seq and unseq
+ if (orderUtils.isTakeSeqAsFirst(
+ seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
+ } else {
+ firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
+ }
+ }
+ if (valueFilter != null
+ && firstTimeSeriesMetadata != null
+ && !isFileOverlapped()
+ && !firstTimeSeriesMetadata.isModified()
+ && !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) {
+ firstTimeSeriesMetadata = null;
+ }
+ }
+
+ protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
+ throws IOException {
+ while (orderUtils.hasNextUnseqResource()
+ && orderUtils.isOverlapped(endpointTime, orderUtils.getNextUnseqFileResource(false))) {
+ unpackUnseqTsFileResource();
+ }
+ while (orderUtils.hasNextSeqResource()
+ && orderUtils.isOverlapped(endpointTime, orderUtils.getNextSeqFileResource(false))) {
+ unpackSeqTsFileResource();
+ }
+ }
+
+ protected void unpackSeqTsFileResource() throws IOException {
+ ITimeSeriesMetadata timeseriesMetadata =
+ loadTimeSeriesMetadata(
+ orderUtils.getNextSeqFileResource(true),
+ seriesPath,
+ context,
+ getAnyFilter(),
+ allSensors);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadata.setSeq(true);
+ seqTimeSeriesMetadata.add(timeseriesMetadata);
+ }
+ }
+
+ protected void unpackUnseqTsFileResource() throws IOException {
+ ITimeSeriesMetadata timeseriesMetadata =
+ loadTimeSeriesMetadata(
+ orderUtils.getNextUnseqFileResource(true),
+ seriesPath,
+ context,
+ getAnyFilter(),
+ allSensors);
+ if (timeseriesMetadata != null) {
+ timeseriesMetadata.setModified(true);
+ timeseriesMetadata.setSeq(false);
+ unSeqTimeSeriesMetadata.add(timeseriesMetadata);
+ }
+ }
+
+ protected ITimeSeriesMetadata loadTimeSeriesMetadata(
+ TsFileResource resource,
+ PartialPath seriesPath,
+ QueryContext context,
+ Filter filter,
+ Set<String> allSensors)
+ throws IOException {
+ return FileLoaderUtils.loadTimeSeriesMetadata(
+ resource, seriesPath, context, filter, allSensors);
+ }
+
+ protected Filter getAnyFilter() {
+ return timeFilter != null ? timeFilter : valueFilter;
+ }
+
+ void setTimeFilter(long timestamp) {
+ ((UnaryFilter) timeFilter).setValue(timestamp);
+ }
+
+ Filter getTimeFilter() {
+ return timeFilter;
+ }
+
+ private class VersionPageReader {
+
+ protected MergeReaderPriority version;
+ protected IPageReader data;
+
+ protected boolean isSeq;
+
+ VersionPageReader(long version, long offset, IPageReader data, boolean isSeq) {
+ this.version = new MergeReaderPriority(version, offset);
+ this.data = data;
+ this.isSeq = isSeq;
+ }
+
+ public boolean isAlignedPageReader() {
+ return data instanceof IAlignedPageReader;
+ }
+
+ Statistics getStatistics() {
+ return data.getStatistics();
+ }
+
+ Statistics getStatistics(int index) throws IOException {
+ if (!(data instanceof IAlignedPageReader)) {
+ throw new IOException("Can only get statistics by index from AlignedPageReader");
+ }
+ return ((IAlignedPageReader) data).getStatistics(index);
+ }
+
+ Statistics getTimeStatistics() throws IOException {
+ if (!(data instanceof IAlignedPageReader)) {
+ throw new IOException("Can only get statistics of time column from AlignedPageReader");
+ }
+ return ((IAlignedPageReader) data).getTimeStatistics();
+ }
+
+ BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+ return data.getAllSatisfiedPageData(ascending);
+ }
+
+ void setFilter(Filter filter) {
+ data.setFilter(filter);
+ }
+
+ boolean isModified() {
+ return data.isModified();
+ }
+
+ public boolean isSeq() {
+ return isSeq;
+ }
+ }
+
+ public interface TimeOrderUtils {
+
+ long getOrderTime(Statistics<? extends Object> statistics);
+
+ long getOrderTime(TsFileResource fileResource);
+
+ long getOverlapCheckTime(Statistics<? extends Object> range);
+
+ boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right);
+
+ boolean isOverlapped(long time, Statistics<? extends Object> right);
+
+ boolean isOverlapped(long time, TsFileResource right);
+
+ <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
+
+ long getCurrentEndPoint(long time, Statistics<? extends Object> statistics);
+
+ long getCurrentEndPoint(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
+
+ boolean isExcessEndpoint(long time, long endpointTime);
+
+ /** Return true if taking first page reader from seq readers */
+ boolean isTakeSeqAsFirst(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics);
+
+ boolean getAscending();
+
+ boolean hasNextSeqResource();
+
+ boolean hasNextUnseqResource();
+
+ TsFileResource getNextSeqFileResource(boolean isDelete);
+
+ TsFileResource getNextUnseqFileResource(boolean isDelete);
+ }
+
+ class DescTimeOrderUtils implements TimeOrderUtils {
+
+ @Override
+ public long getOrderTime(Statistics statistics) {
+ return statistics.getEndTime();
+ }
+
+ @Override
+ public long getOrderTime(TsFileResource fileResource) {
+ return fileResource.getEndTime(seriesPath.getDevice());
+ }
+
+ @Override
+ public long getOverlapCheckTime(Statistics range) {
+ return range.getStartTime();
+ }
+
+ @Override
+ public boolean isOverlapped(Statistics left, Statistics right) {
+ return left.getStartTime() <= right.getEndTime();
+ }
+
+ @Override
+ public boolean isOverlapped(long time, Statistics right) {
+ return time <= right.getEndTime();
+ }
+
+ @Override
+ public boolean isOverlapped(long time, TsFileResource right) {
+ return time <= right.getEndTime(seriesPath.getDevice());
+ }
+
+ @Override
+ public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
+ Objects.requireNonNull(keyExtractor);
+ return (Comparator<T> & Serializable)
+ (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1));
+ }
+
+ @Override
+ public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+ return Math.max(time, statistics.getStartTime());
+ }
+
+ @Override
+ public long getCurrentEndPoint(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+ return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime());
+ }
+
+ @Override
+ public boolean isExcessEndpoint(long time, long endpointTime) {
+ return time < endpointTime;
+ }
+
+ @Override
+ public boolean isTakeSeqAsFirst(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+ return seqStatistics.getEndTime() > unseqStatistics.getEndTime();
+ }
+
+ @Override
+ public boolean getAscending() {
+ return false;
+ }
+
+ @Override
+ public boolean hasNextSeqResource() {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+ break;
+ }
+ curSeqFileIndex--;
+ }
+ return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ }
+
+ @Override
+ public boolean hasNextUnseqResource() {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+ break;
+ }
+ curUnseqFileIndex++;
+ }
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ }
+
+ @Override
+ public TsFileResource getNextSeqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (isDelete) {
+ curSeqFileIndex--;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+ }
+ }
+ return tsFileResource;
+ }
+
+ @Override
+ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (isDelete) {
+ curUnseqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+ }
+ }
+ return tsFileResource;
+ }
+ }
+
+ class AscTimeOrderUtils implements TimeOrderUtils {
+
+ @Override
+ public long getOrderTime(Statistics statistics) {
+ return statistics.getStartTime();
+ }
+
+ @Override
+ public long getOrderTime(TsFileResource fileResource) {
+ return fileResource.getStartTime(seriesPath.getDevice());
+ }
+
+ @Override
+ public long getOverlapCheckTime(Statistics range) {
+ return range.getEndTime();
+ }
+
+ @Override
+ public boolean isOverlapped(Statistics left, Statistics right) {
+ return left.getEndTime() >= right.getStartTime();
+ }
+
+ @Override
+ public boolean isOverlapped(long time, Statistics right) {
+ return time >= right.getStartTime();
+ }
+
+ @Override
+ public boolean isOverlapped(long time, TsFileResource right) {
+ return time >= right.getStartTime(seriesPath.getDevice());
+ }
+
+ @Override
+ public <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor) {
+ Objects.requireNonNull(keyExtractor);
+ return (Comparator<T> & Serializable)
+ (c1, c2) -> Long.compare(keyExtractor.applyAsLong(c1), keyExtractor.applyAsLong(c2));
+ }
+
+ @Override
+ public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+ return Math.min(time, statistics.getEndTime());
+ }
+
+ @Override
+ public long getCurrentEndPoint(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+ return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime());
+ }
+
+ @Override
+ public boolean isExcessEndpoint(long time, long endpointTime) {
+ return time > endpointTime;
+ }
+
+ @Override
+ public boolean isTakeSeqAsFirst(
+ Statistics<? extends Object> seqStatistics, Statistics<? extends Object> unseqStatistics) {
+ return seqStatistics.getStartTime() < unseqStatistics.getStartTime();
+ }
+
+ @Override
+ public boolean getAscending() {
+ return true;
+ }
+
+ @Override
+ public boolean hasNextSeqResource() {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
+ break;
+ }
+ curSeqFileIndex++;
+ }
+ return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ }
+
+ @Override
+ public boolean hasNextUnseqResource() {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
+ break;
+ }
+ curUnseqFileIndex++;
+ }
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ }
+
+ @Override
+ public TsFileResource getNextSeqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
+ if (isDelete) {
+ curSeqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, true);
+ }
+ }
+ return tsFileResource;
+ }
+
+ @Override
+ public TsFileResource getNextUnseqFileResource(boolean isDelete) {
+ TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
+ if (isDelete) {
+ curUnseqFileIndex++;
+ if (context.isEnableTracing()) {
+ TracingManager.getInstance().addTsFile(context.getQueryId(), tsFileResource, false);
+ }
+ }
+ return tsFileResource;
+ }
+ }
+
+ public TimeOrderUtils getOrderUtils() {
+ return orderUtils;
+ }
+
+ @TestOnly
+ public Filter getValueFilter() {
+ return valueFilter;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java
new file mode 100644
index 0000000000..44cd258cb7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPrefetchReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.series;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class SeriesRawDataPrefetchReader implements ManagedSeriesReader {
+
+ private final SeriesBitmapReader seriesReader;
+
+ private boolean hasRemaining;
+ private boolean managedByQueryManager;
+
+ private BatchData batchData;
+ private boolean hasCachedBatchData = false;
+
+ public SeriesRawDataPrefetchReader(SeriesBitmapReader seriesReader) {
+ this.seriesReader = seriesReader;
+ }
+
+ public SeriesRawDataPrefetchReader(
+ PartialPath seriesPath,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ QueryDataSource dataSource,
+ Filter timeFilter,
+ Filter valueFilter,
+ TsFileFilter fileFilter,
+ boolean ascending) {
+ this.seriesReader =
+ seriesPath.createSeriesBitmapReader(
+ allSensors,
+ dataType,
+ context,
+ dataSource,
+ timeFilter,
+ valueFilter,
+ fileFilter,
+ ascending);
+ }
+
+ /**
+ * This method overrides the AbstractDataReader.hasNextOverlappedPage for pause reads, to achieve
+ * a continuous read
+ */
+ @Override
+ public boolean hasNextBatch() throws IOException {
+
+ if (hasCachedBatchData) {
+ return true;
+ }
+
+ /*
+ * consume page data firstly
+ */
+ if (readPageData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+
+ /*
+ * consume chunk data secondly
+ */
+ if (readChunkData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+
+ /*
+ * consume next file finally
+ */
+ while (seriesReader.hasNextFile()) {
+ if (readChunkData()) {
+ hasCachedBatchData = true;
+ return true;
+ }
+ }
+ return hasCachedBatchData;
+ }
+
+ @Override
+ public BatchData nextBatch() throws IOException {
+ if (hasCachedBatchData || hasNextBatch()) {
+ hasCachedBatchData = false;
+ return batchData;
+ }
+ throw new IOException("no next batch");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources need to close
+ }
+
+ @Override
+ public boolean isManagedByQueryManager() {
+ return managedByQueryManager;
+ }
+
+ @Override
+ public void setManagedByQueryManager(boolean managedByQueryManager) {
+ this.managedByQueryManager = managedByQueryManager;
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return hasRemaining;
+ }
+
+ @Override
+ public void setHasRemaining(boolean hasRemaining) {
+ this.hasRemaining = hasRemaining;
+ }
+
+ private boolean readChunkData() throws IOException {
+ while (seriesReader.hasNextChunk()) {
+ if (readPageData()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean readPageData() throws IOException {
+ while (seriesReader.hasNextPage()) {
+ batchData = seriesReader.nextPage();
+ if (!isEmpty(batchData)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isEmpty(BatchData batchData) {
+ return batchData == null || !batchData.hasCurrent();
+ }
+
+ @TestOnly
+ public SeriesBitmapReader getSeriesReader() {
+ return seriesReader;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 6273add29f..a983e6db5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
import org.apache.iotdb.db.query.reader.chunk.metadata.MemAlignedChunkMetadataLoader;
@@ -263,4 +264,20 @@ public class FileLoaderUtils {
IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
return chunkReader.loadPageReaderList();
}
+
+ public static List<IPageReader> loadPagePrefetchReaderList(
+ IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
+ if (chunkMetaData == null) {
+ throw new IOException("Can't init null chunkMeta");
+ }
+ IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
+ IChunkReader chunkReader;
+ if (chunkLoader instanceof DiskAlignedChunkLoader) {
+ chunkReader =
+ ((DiskAlignedChunkLoader) chunkLoader).getChunkPrefetchReader(chunkMetaData, timeFilter);
+ } else {
+ chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
+ }
+ return chunkReader.loadPageReaderList();
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
index 3dcec665fe..c83a41795a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.read.reader.page.AlignedPagePrefetchReader;
import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader;
import java.io.IOException;
@@ -87,6 +88,25 @@ public class AlignedChunkReader implements IChunkReader {
initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
}
+ public AlignedChunkReader(
+ Chunk timeChunk, List<Chunk> valueChunkList, Filter filter, boolean prefetch)
+ throws IOException {
+ this.filter = filter;
+ this.timeChunkDataBuffer = timeChunk.getData();
+ this.valueDeleteIntervalList = new ArrayList<>();
+ this.timeChunkHeader = timeChunk.getHeader();
+ this.unCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType());
+ List<Statistics> valueChunkStatisticsList = new ArrayList<>();
+ valueChunkList.forEach(
+ chunk -> {
+ valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader());
+ valueChunkDataBufferList.add(chunk == null ? null : chunk.getData());
+ valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic());
+ valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList());
+ });
+ initAllPrefetchPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList);
+ }
+
/** construct all the page readers in this chunk */
private void initAllPageReaders(
Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList)
@@ -138,6 +158,56 @@ public class AlignedChunkReader implements IChunkReader {
}
}
+ private void initAllPrefetchPageReaders(
+ Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList)
+ throws IOException {
+ // construct next satisfied page header
+ while (timeChunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader timePageHeader;
+ List<PageHeader> valuePageHeaderList = new ArrayList<>();
+
+ boolean exits = false;
+ // this chunk has only one page
+ if ((timeChunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ timePageHeader = PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkStatistics);
+ for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
+ if (valueChunkDataBufferList.get(i) != null) {
+ exits = true;
+ valuePageHeaderList.add(
+ PageHeader.deserializeFrom(
+ valueChunkDataBufferList.get(i), valueChunkStatisticsList.get(i)));
+ } else {
+ valuePageHeaderList.add(null);
+ }
+ }
+ } else { // this chunk has more than one page
+ timePageHeader =
+ PageHeader.deserializeFrom(timeChunkDataBuffer, timeChunkHeader.getDataType());
+ for (int i = 0; i < valueChunkDataBufferList.size(); i++) {
+ if (valueChunkDataBufferList.get(i) != null) {
+ exits = true;
+ valuePageHeaderList.add(
+ PageHeader.deserializeFrom(
+ valueChunkDataBufferList.get(i), valueChunkHeaderList.get(i).getDataType()));
+ } else {
+ valuePageHeaderList.add(null);
+ }
+ }
+ }
+ // if the current page satisfies
+ if (exits && timePageSatisfied(timePageHeader)) {
+ AlignedPagePrefetchReader alignedPageReader =
+ constructPrefetchPageReaderForNextPage(timePageHeader, valuePageHeaderList);
+ if (alignedPageReader != null) {
+ pageReaderList.add(alignedPageReader);
+ }
+ } else {
+ skipBytesInStreamByLength(timePageHeader, valuePageHeaderList);
+ }
+ }
+ }
+
/** used for time page filter */
private boolean timePageSatisfied(PageHeader timePageHeader) {
long startTime = timePageHeader.getStatistics().getStartTime();
@@ -219,6 +289,65 @@ public class AlignedChunkReader implements IChunkReader {
return alignedPageReader;
}
+ private AlignedPagePrefetchReader constructPrefetchPageReaderForNextPage(
+ PageHeader timePageHeader, List<PageHeader> valuePageHeader) throws IOException {
+ PageInfo timePageInfo = new PageInfo();
+ getPageInfo(timePageHeader, timeChunkDataBuffer, timeChunkHeader, timePageInfo);
+ PageInfo valuePageInfo = new PageInfo();
+ List<PageHeader> valuePageHeaderList = new ArrayList<>();
+ List<ByteBuffer> valuePageDataList = new ArrayList<>();
+ List<TSDataType> valueDataTypeList = new ArrayList<>();
+ List<Decoder> valueDecoderList = new ArrayList<>();
+ boolean exist = false;
+ for (int i = 0; i < valuePageHeader.size(); i++) {
+ if (valuePageHeader.get(i) == null
+ || valuePageHeader.get(i).getUncompressedSize() == 0) { // Empty Page
+ valuePageHeaderList.add(null);
+ valuePageDataList.add(null);
+ valueDataTypeList.add(null);
+ valueDecoderList.add(null);
+ } else if (pageSatisfied(
+ valuePageHeader.get(i),
+ valueDeleteIntervalList.get(i))) { // if the page is satisfied, deserialize it
+ getPageInfo(
+ valuePageHeader.get(i),
+ valueChunkDataBufferList.get(i),
+ valueChunkHeaderList.get(i),
+ valuePageInfo);
+ valuePageHeaderList.add(valuePageInfo.pageHeader);
+ valuePageDataList.add(valuePageInfo.pageData);
+ valueDataTypeList.add(valuePageInfo.dataType);
+ valueDecoderList.add(valuePageInfo.decoder);
+ exist = true;
+ } else { // if the page is not satisfied, just skip it
+ valueChunkDataBufferList
+ .get(i)
+ .position(
+ valueChunkDataBufferList.get(i).position()
+ + valuePageHeader.get(i).getCompressedSize());
+ valuePageHeaderList.add(null);
+ valuePageDataList.add(null);
+ valueDataTypeList.add(null);
+ valueDecoderList.add(null);
+ }
+ }
+ if (!exist) {
+ return null;
+ }
+ AlignedPagePrefetchReader alignedPageReader =
+ new AlignedPagePrefetchReader(
+ timePageHeader,
+ timePageInfo.pageData,
+ timeDecoder,
+ valuePageHeaderList,
+ valuePageDataList,
+ valueDataTypeList,
+ valueDecoderList,
+ filter);
+ alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList);
+ return alignedPageReader;
+ }
+
/**
* deserialize the page
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java
new file mode 100644
index 0000000000..8fd7207052
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPagePrefetchReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.reader.page;
+
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
+import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+// we use prefetchReader to only read time + bitmap without deserialize values
+// one can also choose reading bitmap only without time and values
+
+public class AlignedPagePrefetchReader implements IPageReader, IAlignedPageReader {
+
+ private final TimePageReader timePageReader;
+ private final List<ValuePageBitmapReader> valuePageReaderList;
+ private final int valueCount;
+ private Filter filter;
+ private boolean isModified;
+
+ public AlignedPagePrefetchReader(
+ PageHeader timePageHeader,
+ ByteBuffer timePageData,
+ Decoder timeDecoder,
+ List<PageHeader> valuePageHeaderList,
+ List<ByteBuffer> valuePageDataList,
+ List<TSDataType> valueDataTypeList,
+ List<Decoder> valueDecoderList,
+ Filter filter) {
+ timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder);
+ isModified = timePageReader.isModified();
+ valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
+ for (int i = 0; i < valuePageHeaderList.size(); i++) {
+ if (valuePageHeaderList.get(i) != null) {
+ ValuePageBitmapReader valuePageReader =
+ new ValuePageBitmapReader(
+ valuePageHeaderList.get(i),
+ valuePageDataList.get(i),
+ valueDataTypeList.get(i),
+ valueDecoderList.get(i));
+ valuePageReaderList.add(valuePageReader);
+ isModified = isModified || valuePageReader.isModified();
+ } else {
+ valuePageReaderList.add(null);
+ }
+ }
+ this.filter = filter;
+ this.valueCount = valuePageReaderList.size();
+ }
+
+ @Override
+ public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+ // this is only for implementing the interface
+ BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
+ int timeIndex = -1;
+ while (timePageReader.hasNextTime()) {
+ long timestamp = timePageReader.nextTime();
+ timeIndex++;
+ // if all the sub sensors' value are null in current row, just discard it
+ boolean isNull = true;
+ Object notNullObject = null;
+ TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
+ for (int i = 0; i < v.length; i++) {
+ ValuePageBitmapReader pageReader = valuePageReaderList.get(i);
+ v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
+ if (v[i] != null) {
+ isNull = false;
+ notNullObject = v[i].getValue();
+ }
+ }
+ // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
+ // sensor
+ if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
+ pageData.putVector(timestamp, v);
+ }
+ }
+ return pageData.flip();
+ }
+
+ public void setDeleteIntervalList(List<List<TimeRange>> list) {
+ for (int i = 0; i < valueCount; i++) {
+ if (valuePageReaderList.get(i) != null) {
+ valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
+ }
+ }
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
+ ? valuePageReaderList.get(0).getStatistics()
+ : timePageReader.getStatistics();
+ }
+
+ @Override
+ public Statistics getStatistics(int index) {
+ ValuePageBitmapReader valuePageReader = valuePageReaderList.get(index);
+ return valuePageReader == null ? null : valuePageReader.getStatistics();
+ }
+
+ @Override
+ public Statistics getTimeStatistics() {
+ return timePageReader.getStatistics();
+ }
+
+ @Override
+ public void setFilter(Filter filter) {
+ if (this.filter == null) {
+ this.filter = filter;
+ } else {
+ this.filter = new AndFilter(this.filter, filter);
+ }
+ }
+
+ @Override
+ public boolean isModified() {
+ return isModified;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java
new file mode 100644
index 0000000000..5869b288af
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageBitmapReader.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.read.reader.page;
+
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class ValuePageBitmapReader {
+
+ private static final int MASK = 0x80;
+
+ private final PageHeader pageHeader;
+
+ private final TSDataType dataType;
+
+ /** decoder for value column */
+ private final Decoder valueDecoder;
+
+ private byte[] bitmap;
+
+ private int size;
+
+ /** value column in memory */
+ protected ByteBuffer valueBuffer;
+
+ /** A list of deleted intervals. */
+ private List<TimeRange> deleteIntervalList;
+
+ private int deleteCursor = 0;
+
+ public ValuePageBitmapReader(
+ PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder) {
+ this.dataType = dataType;
+ this.valueDecoder = valueDecoder;
+ this.pageHeader = pageHeader;
+ if (pageData != null) {
+ splitDataToBitmapAndValue(pageData);
+ }
+ this.valueBuffer = pageData;
+ }
+
+ private void splitDataToBitmapAndValue(ByteBuffer pageData) {
+ if (!pageData.hasRemaining()) { // Empty Page
+ return;
+ }
+ this.size = ReadWriteIOUtils.readInt(pageData);
+ this.bitmap = new byte[(size + 7) / 8];
+ pageData.get(bitmap);
+ this.valueBuffer = pageData.slice();
+ }
+
+ /**
+ * return a BatchData with the corresponding timeBatch, the BatchData's dataType is same as this
+ * sub sensor
+ */
+ public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter filter) {
+ BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false);
+ for (int i = 0; i < timeBatch.length; i++) {
+ if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+ continue;
+ }
+ long timestamp = timeBatch[i];
+ switch (dataType) {
+ case BOOLEAN:
+ boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
+ pageData.putBoolean(timestamp, aBoolean);
+ }
+ break;
+ case INT32:
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
+ pageData.putInt(timestamp, anInt);
+ }
+ break;
+ case INT64:
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
+ pageData.putLong(timestamp, aLong);
+ }
+ break;
+ case FLOAT:
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
+ pageData.putFloat(timestamp, aFloat);
+ }
+ break;
+ case DOUBLE:
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
+ pageData.putDouble(timestamp, aDouble);
+ }
+ break;
+ case TEXT:
+ Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
+ pageData.putBinary(timestamp, aBinary);
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+ return pageData.flip();
+ }
+
+ public TsPrimitiveType nextValue(long timestamp, int timeIndex) {
+ if (valueBuffer == null || ((bitmap[timeIndex / 8] & 0xFF) & (MASK >>> (timeIndex % 8))) == 0) {
+ return null;
+ } else {
+ if (!isDeleted(timestamp)) {
+ return new TsPrimitiveType.TsDouble(1.0);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * return the value array of the corresponding time, if this sub sensor don't have a value in a
+ * time, just fill it with null
+ */
+ public TsPrimitiveType[] nextValueBatch(long[] timeBatch) {
+ TsPrimitiveType[] valueBatch = new TsPrimitiveType[size];
+ if (valueBuffer == null) {
+ return valueBatch;
+ }
+ for (int i = 0; i < size; i++) {
+ if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+ continue;
+ }
+ switch (dataType) {
+ case BOOLEAN:
+ boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsBoolean(aBoolean);
+ }
+ break;
+ case INT32:
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsInt(anInt);
+ }
+ break;
+ case INT64:
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsLong(aLong);
+ }
+ break;
+ case FLOAT:
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsFloat(aFloat);
+ }
+ break;
+ case DOUBLE:
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsDouble(aDouble);
+ }
+ break;
+ case TEXT:
+ Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ if (!isDeleted(timeBatch[i])) {
+ valueBatch[i] = new TsPrimitiveType.TsBinary(aBinary);
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ }
+ return valueBatch;
+ }
+
+ public Statistics getStatistics() {
+ return pageHeader.getStatistics();
+ }
+
+ public void setDeleteIntervalList(List<TimeRange> list) {
+ this.deleteIntervalList = list;
+ }
+
+ public List<TimeRange> getDeleteIntervalList() {
+ return deleteIntervalList;
+ }
+
+ public boolean isModified() {
+ return pageHeader.isModified();
+ }
+
+ protected boolean isDeleted(long timestamp) {
+ while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
+ if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
+ return true;
+ } else if (deleteIntervalList.get(deleteCursor).getMax() < timestamp) {
+ deleteCursor++;
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+}