You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2023/10/23 18:42:29 UTC
[iotdb] 04/09: minmax-lsm 10%
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d47731cc10999efe6e158c9ce9ec24ec5190355
Author: Lei Rui <10...@qq.com>
AuthorDate: Sat Sep 30 13:56:10 2023 +0800
minmax-lsm 10%
---
.../resources/conf/iotdb-engine.properties | 1 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +
.../groupby/GroupByWithoutValueFilterDataSet.java | 40 +-
.../groupby/LocalGroupByExecutor4MinMax.java | 966 +++++++++++++++++++++
.../iotdb/tsfile/common/conf/TSFileConfig.java | 10 +
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 4 +
.../iotdb/tsfile/read/common/IOMonitor2.java | 2 +
7 files changed, 1015 insertions(+), 16 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8b202feffaf..95228d09694 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -605,6 +605,7 @@ enable_unseq_compaction=false
####################
### Configurations for tsfile-format
####################
+enableMinMaxLSM=false
use_Statistics=true
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b7a80faebae..4711470e002 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -896,6 +896,14 @@ public class IoTDBDescriptor {
}
private void loadTsFileProps(Properties properties) {
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setEnableMinMaxLSM(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enableMinMaxLSM",
+ Boolean.toString(
+ TSFileDescriptor.getInstance().getConfig().isEnableMinMaxLSM()))));
TSFileDescriptor.getInstance()
.getConfig()
.setUseStatistics(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index bd02b795e77..b23d9429e1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -201,24 +201,32 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
boolean ascending)
throws StorageEngineException, QueryProcessException {
if (CONFIG.isEnableCPV()) {
- if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
- && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
+ if (TSFileDescriptor.getInstance().getConfig().isEnableMinMaxLSM()) { // MinMax-LSM
IOMonitor2.dataSetType =
- DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_UseIndex;
- } else if (!TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
- && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
- IOMonitor2.dataSetType =
- DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeIndex;
- } else if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
- && !TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
- IOMonitor2.dataSetType =
- DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoValueIndex;
- } else {
- IOMonitor2.dataSetType =
- DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeValueIndex;
+ DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_EnableMinMaxLSM;
+ return new LocalGroupByExecutor4MinMax(
+ path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
+ } else { // M4-LSM
+ if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
+ && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
+ IOMonitor2.dataSetType =
+ DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_UseIndex;
+ } else if (!TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
+ && TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
+ IOMonitor2.dataSetType =
+ DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeIndex;
+ } else if (TSFileDescriptor.getInstance().getConfig().isUseTimeIndex()
+ && !TSFileDescriptor.getInstance().getConfig().isUseValueIndex()) {
+ IOMonitor2.dataSetType =
+ DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoValueIndex;
+ } else {
+ IOMonitor2.dataSetType =
+ DataSetType
+ .GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeValueIndex;
+ }
+ return new LocalGroupByExecutor4CPV(
+ path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
}
- return new LocalGroupByExecutor4CPV(
- path, allSensors, dataType, context, timeFilter, fileFilter, ascending);
} else { // enableCPV=false
if (TSFileDescriptor.getInstance().getConfig().isUseStatistics()) {
IOMonitor2.dataSetType =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java
new file mode 100644
index 00000000000..ff074f588cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java
@@ -0,0 +1,966 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+// This is the MFGroupByExecutor in M4-LSM paper.
+public class LocalGroupByExecutor4MinMax implements GroupByExecutor {
+
+ private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA");
+
+ // Aggregate result buffer of this path
+ private final List<AggregateResult> results = new ArrayList<>();
+
+ private List<ChunkSuit4CPV> currentChunkList;
+ private final List<ChunkSuit4CPV> futureChunkList = new ArrayList<>();
+
+ // this is designed to keep the split chunk from futureChunkList, not destroying the sorted order
+ // of futureChunkList
+ private Map<Integer, List<ChunkSuit4CPV>> splitChunkList = new HashMap<>();
+
+ private Filter timeFilter;
+
+ private TSDataType tsDataType;
+
+ public LocalGroupByExecutor4MinMax(
+ PartialPath path,
+ Set<String> allSensors,
+ TSDataType dataType,
+ QueryContext context,
+ Filter timeFilter,
+ TsFileFilter fileFilter,
+ boolean ascending)
+ throws StorageEngineException, QueryProcessException {
+ // long start = System.nanoTime();
+
+ this.tsDataType = dataType;
+
+ // get all data sources
+ QueryDataSource queryDataSource =
+ QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter);
+
+ // update filter by TTL
+ this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+ SeriesReader seriesReader =
+ new SeriesReader(
+ path,
+ allSensors,
+ dataType,
+ context,
+ queryDataSource,
+ timeFilter,
+ null,
+ fileFilter,
+ ascending);
+
+ // unpackAllOverlappedFilesToTimeSeriesMetadata
+ try {
+ // : this might be bad to load all chunk metadata at first
+ futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV());
+ // order futureChunkList by chunk startTime
+ futureChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() {
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return ((Comparable) (o1.getChunkMetadata().getStartTime()))
+ .compareTo(o2.getChunkMetadata().getStartTime());
+ }
+ });
+
+ if (M4_CHUNK_METADATA.isDebugEnabled()) {
+ if (timeFilter instanceof GroupByFilter) {
+ M4_CHUNK_METADATA.debug(
+ "M4_QUERY_PARAM,{},{},{}",
+ ((GroupByFilter) timeFilter).getStartTime(),
+ ((GroupByFilter) timeFilter).getEndTime(),
+ ((GroupByFilter) timeFilter).getInterval());
+ }
+ for (ChunkSuit4CPV chunkSuit4CPV : futureChunkList) {
+ Statistics statistics = chunkSuit4CPV.getChunkMetadata().getStatistics();
+ long FP_t = statistics.getStartTime();
+ long LP_t = statistics.getEndTime();
+ long BP_t = statistics.getBottomTimestamp();
+ long TP_t = statistics.getTopTimestamp();
+ switch (statistics.getType()) {
+ case INT32:
+ int FP_v_int = ((IntegerStatistics) statistics).getFirstValue();
+ int LP_v_int = ((IntegerStatistics) statistics).getLastValue();
+ int BP_v_int = ((IntegerStatistics) statistics).getMinValue();
+ int TP_v_int = ((IntegerStatistics) statistics).getMaxValue();
+ M4_CHUNK_METADATA.debug(
+ "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+ FP_t,
+ LP_t,
+ BP_t,
+ TP_t,
+ FP_v_int,
+ LP_v_int,
+ BP_v_int,
+ TP_v_int,
+ chunkSuit4CPV.getChunkMetadata().getVersion(),
+ chunkSuit4CPV.getChunkMetadata().getOffsetOfChunkHeader(),
+ statistics.getCount());
+ break;
+ case INT64:
+ long FP_v_long = ((LongStatistics) statistics).getFirstValue();
+ long LP_v_long = ((LongStatistics) statistics).getLastValue();
+ long BP_v_long = ((LongStatistics) statistics).getMinValue();
+ long TP_v_long = ((LongStatistics) statistics).getMaxValue();
+ M4_CHUNK_METADATA.debug(
+ "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+ FP_t,
+ LP_t,
+ BP_t,
+ TP_t,
+ FP_v_long,
+ LP_v_long,
+ BP_v_long,
+ TP_v_long,
+ chunkSuit4CPV.getChunkMetadata().getVersion(),
+ chunkSuit4CPV.getChunkMetadata().getOffsetOfChunkHeader(),
+ statistics.getCount());
+ break;
+ case FLOAT:
+ float FP_v_float = ((FloatStatistics) statistics).getFirstValue();
+ float LP_v_float = ((FloatStatistics) statistics).getLastValue();
+ float BP_v_float = ((FloatStatistics) statistics).getMinValue();
+ float TP_v_float = ((FloatStatistics) statistics).getMaxValue();
+ M4_CHUNK_METADATA.debug(
+ "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+ FP_t,
+ LP_t,
+ BP_t,
+ TP_t,
+ FP_v_float,
+ LP_v_float,
+ BP_v_float,
+ TP_v_float,
+ chunkSuit4CPV.getChunkMetadata().getVersion(),
+ chunkSuit4CPV.getChunkMetadata().getOffsetOfChunkHeader(),
+ statistics.getCount());
+ break;
+ case DOUBLE:
+ double FP_v_double = ((DoubleStatistics) statistics).getFirstValue();
+ double LP_v_double = ((DoubleStatistics) statistics).getLastValue();
+ double BP_v_double = ((DoubleStatistics) statistics).getMinValue();
+ double TP_v_double = ((DoubleStatistics) statistics).getMaxValue();
+ M4_CHUNK_METADATA.debug(
+ "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+ FP_t,
+ LP_t,
+ BP_t,
+ TP_t,
+ FP_v_double,
+ LP_v_double,
+ BP_v_double,
+ TP_v_double,
+ chunkSuit4CPV.getChunkMetadata().getVersion(),
+ chunkSuit4CPV.getChunkMetadata().getOffsetOfChunkHeader(),
+ statistics.getCount());
+ break;
+ default:
+ throw new QueryProcessException("unsupported data type!");
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+
+ // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() -
+ // start);
+ }
+
+ @Override
+ public void addAggregateResult(AggregateResult aggrResult) {
+ results.add(aggrResult);
+ }
+
+ private void getCurrentChunkListFromFutureChunkList(
+ long curStartTime, long curEndTime, long startTime, long endTime, long interval)
+ throws IOException {
+ // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN;
+
+ // empty currentChunkList
+ currentChunkList = new ArrayList<>();
+
+ // get related chunks from splitChunkList
+ int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
+ if (splitChunkList.get(curIdx) != null) {
+ currentChunkList.addAll(splitChunkList.get(curIdx));
+ // when to free splitChunkList memory
+ }
+
+ // iterate futureChunkList
+ ListIterator<ChunkSuit4CPV> itr = futureChunkList.listIterator();
+ while (itr.hasNext()) {
+ ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ long chunkMinTime = chunkMetadata.getStartTime();
+ long chunkMaxTime = chunkMetadata.getEndTime();
+ if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
+ // the chunk falls on the right side of the current M4 interval Ii,
+ // and since futureChunkList is ordered by the startTime of chunkMetadata,
+ // the loop can be terminated early.
+ break;
+ } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
+ // the chunk falls on the left side of the current M4 interval Ii
+ // or the chunk falls on the right side of the total query range
+ itr.remove();
+ } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
+ // the chunk falls completely within the current M4 interval Ii
+ currentChunkList.add(chunkSuit4CPV);
+ itr.remove();
+ } else {
+ // the chunk partially overlaps in time with the current M4 interval Ii.
+ // load this chunk, split it on deletes and all w intervals.
+ // add to currentChunkList and futureChunkList.
+ itr.remove();
+ // B: loads chunk data from disk to memory
+ // C: decompress page data, split time&value buffers
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
+ // DIRECTLY), WHICH WILL INTRODUCE BUGS!
+
+ // chunk data read operation (b) get the closest data point after or before a timestamp
+ pageReader.split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ splitChunkList,
+ chunkMetadata);
+ }
+ }
+ }
+
+ /**
+ * @param curStartTime closed
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
+ */
+ @Override
+ public List<AggregateResult> calcResult(
+ long curStartTime, long curEndTime, long startTime, long endTime, long interval)
+ throws IOException {
+ // clear result cache
+ for (AggregateResult result : results) {
+ result.reset();
+ }
+
+ // long start = System.nanoTime();
+ getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime, startTime, endTime, interval);
+ // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start);
+
+ if (currentChunkList.size() == 0) {
+ return results;
+ }
+
+ // start = System.nanoTime();
+ calculateBottomPoint(currentChunkList, startTime, endTime, interval, curStartTime);
+ // IOMonitor2.addMeasure(Operation.M4_LSM_BP, System.nanoTime() - start);
+
+ // start = System.nanoTime();
+ calculateTopPoint(currentChunkList, startTime, endTime, interval, curStartTime);
+ // IOMonitor2.addMeasure(Operation.M4_LSM_TP, System.nanoTime() - start);
+
+ return results;
+ }
+
+ private void calculateBottomPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ // IOMonitor2.M4_LSM_status = Operation.M4_LSM_BP;
+ // check size>0 because after updateBPTP because empty ChunkSuit4CPV will be removed from
+ // currentChunkList
+ while (currentChunkList.size() > 0) { // loop 1
+ // sorted by bottomValue, find BP candidate set
+ // double check the sort order logic for different aggregations
+ currentChunkList.sort(
+ (o1, o2) -> {
+ return ((Comparable) (o1.getStatistics().getMinValue()))
+ .compareTo(o2.getStatistics().getMinValue());
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ });
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ Object value = currentChunkList.get(0).getStatistics().getMinValue();
+ List<ChunkSuit4CPV> candidateSet = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ if (chunkSuit4CPV.getStatistics().getMinValue().equals(value)) {
+ candidateSet.add(chunkSuit4CPV);
+ } else {
+ break; // note that this is an early break since currentChunkList is sorted
+ }
+ }
+
+ // check, whether nonLazyLoad remove affects candidateSet
+ List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>(candidateSet);
+ // double check the sort order logic for version
+ nonLazyLoad.sort(
+ (o1, o2) ->
+ new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader())));
+ while (true) { // loop 2
+ // if there is no chunk for lazy loading, then load all chunks in candidateSet,
+ // and apply deleteIntervals, deleting BP no matter out of deletion or update
+ if (nonLazyLoad.size() == 0) {
+ for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
+ // Note the pass of delete intervals
+ if (chunkSuit4CPV.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
+ // DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ chunkSuit4CPV.setPageReader(pageReader);
+ } else {
+ // Note the pass of delete intervals, especially deleting the non-latest candidate
+ // point.
+ // pageReader does not refer to the same deleteInterval as those in chunkMetadata
+ // after chunkMetadata executes insertIntoSortedDeletions
+ chunkSuit4CPV
+ .getPageReader()
+ .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList());
+ }
+ // chunk data read operation (c)
+ // chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV);
+ chunkSuit4CPV.getPageReader().updateBP_withValueIndex(chunkSuit4CPV);
+ // check if empty
+ if (chunkSuit4CPV.statistics.getCount() == 0) {
+ currentChunkList.remove(chunkSuit4CPV);
+ }
+ }
+ break; // exit loop 2, enter loop 1
+ }
+ // otherwise, extract the next new candidate point from the candidate set with the lazy load
+ // strategy
+ ChunkSuit4CPV candidate = nonLazyLoad.get(0); // sorted by version
+ MergeReaderPriority candidateVersion =
+ new MergeReaderPriority(
+ candidate.getChunkMetadata().getVersion(),
+ candidate.getChunkMetadata().getOffsetOfChunkHeader());
+ long candidateTimestamp = candidate.getStatistics().getBottomTimestamp(); // check
+ Object candidateValue = candidate.getStatistics().getMinValue(); // check
+
+ // verify if this candidate point is deleted
+ boolean isDeletedItself = false;
+ if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) {
+ isDeletedItself = true;
+ } else {
+ isDeletedItself =
+ PageReader.isDeleted(
+ candidateTimestamp, candidate.getChunkMetadata().getDeleteIntervalList());
+ }
+ if (isDeletedItself) {
+ // the candidate point is deleted, then label the chunk as already lazy loaded, and back
+ // to loop 2
+ nonLazyLoad.remove(candidate);
+ // check this can really remove the element
+ // check whether nonLazyLoad remove affects candidateSet
+ // check nonLazyLoad sorted by version number from high to low
+ continue; // back to loop 2
+
+ } else { // not deleted
+ boolean isUpdate = false;
+ // find overlapping chunks with higher versions
+ List<ChunkSuit4CPV> overlaps = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority version =
+ new MergeReaderPriority(
+ chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(candidateVersion) <= 0) { // including bottomChunkMetadata
+ continue;
+ }
+ if (candidateTimestamp < chunkSuit4CPV.getStatistics().getStartTime()
+ || candidateTimestamp > chunkSuit4CPV.getStatistics().getEndTime()) {
+ continue;
+ }
+ if (candidateTimestamp == chunkSuit4CPV.getStatistics().getStartTime()
+ || candidateTimestamp == chunkSuit4CPV.getStatistics().getEndTime()) {
+ isUpdate = true;
+ // this case does not need to execute chunk data read operation (a),
+ // because definitely overwrite
+ break;
+ }
+ overlaps.add(chunkSuit4CPV);
+ }
+
+ if (!isUpdate && overlaps.size() == 0) {
+ // no overlaps, then the candidate point is not updated, then it is the final result
+ results
+ .get(0)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return; // finished
+ } else if (!isUpdate) {
+ // verify whether the candidate point is updated
+ for (ChunkSuit4CPV chunkSuit4CPV : overlaps) {
+ if (chunkSuit4CPV.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS
+ // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ chunkSuit4CPV.setPageReader(pageReader);
+ }
+ // chunk data read operation (a): check existence of data point at a timestamp
+ isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp);
+ if (isUpdate) {
+ // since the candidate point is updated, early break
+ break;
+ }
+ }
+ }
+ if (!isUpdate) {
+ // the candidate point is not updated, then it is the final result
+ results
+ .get(0)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return; // finished
+ } else {
+ // the candidate point is updated, then label the chunk as already lazy loaded,
+ // add the deletion of the candidate point in deleteInterval, and back to loop 2
+ if (candidate.getChunkMetadata().getDeleteIntervalList() == null) {
+ List<TimeRange> tmp = new ArrayList<>();
+ tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp));
+ candidate.getChunkMetadata().setDeleteIntervalList(tmp);
+ } else {
+ candidate
+ .getChunkMetadata()
+ .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp); // check
+ }
+ nonLazyLoad.remove(candidate);
+ // check this can really remove the element
+ // check whether nonLazyLoad remove affects candidateSet
+ // check nonLazyLoad sorted by version number from high to low
+ continue; // back to loop 2
+ }
+ }
+ }
+ }
+ }
+
+ private void calculateTopPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ // IOMonitor2.M4_LSM_status = Operation.M4_LSM_TP;
+ // check size>0 because after updateBPTP empty ChunkSuit4CPV will be removed from
+ // currentChunkList
+ while (currentChunkList.size() > 0) { // loop 1
+ // sorted by topValue, find TP candidate set
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // double check the sort order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return ((Comparable) (o2.getStatistics().getMaxValue()))
+ .compareTo(o1.getStatistics().getMaxValue());
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata,
+ // because statistics of ChunkSuit4CPV is updated, while statistics of
+ // ChunkSuit4CPV.ChunkMetadata
+ // is fixed.
+ }
+ });
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ Object value = currentChunkList.get(0).getStatistics().getMaxValue();
+ List<ChunkSuit4CPV> candidateSet = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ if (chunkSuit4CPV.getStatistics().getMaxValue().equals(value)) {
+ candidateSet.add(chunkSuit4CPV);
+ } else {
+ break; // note that this is an early break since currentChunkList is sorted
+ }
+ }
+
+ List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>(candidateSet);
+ // check, whether nonLazyLoad remove affects candidateSet
+ nonLazyLoad.sort(
+ new Comparator<ChunkSuit4CPV>() { // double check the sort order logic for version
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ });
+ while (true) { // loop 2
+ // if there is no chunk for lazy loading, then load all chunks in candidateSet,
+ // and apply deleteIntervals, deleting TP no matter out of deletion or update
+ if (nonLazyLoad.size() == 0) {
+ for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
+ // Note the pass of delete intervals
+ if (chunkSuit4CPV.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
+ // DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ chunkSuit4CPV.setPageReader(pageReader);
+ } else {
+ // Note the pass of delete intervals, especially deleting the non-latest candidate
+ // point.
+ // pageReader does not refer to the same deleteInterval as those in chunkMetadata
+ // after chunkMetadata executes insertIntoSortedDeletions
+ chunkSuit4CPV
+ .getPageReader()
+ .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList());
+ }
+ // chunk data read operation (c)
+ // chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV);
+ chunkSuit4CPV.getPageReader().updateTP_withValueIndex(chunkSuit4CPV); //
+ // check if empty
+ if (chunkSuit4CPV.statistics.getCount() == 0) {
+ currentChunkList.remove(chunkSuit4CPV);
+ }
+ }
+ break; // exit loop 2, enter loop 1
+ }
+ // otherwise, extract the next new candidate point from the candidate set with the lazy load
+ // strategy
+ ChunkSuit4CPV candidate = nonLazyLoad.get(0); // sorted by version
+ MergeReaderPriority candidateVersion =
+ new MergeReaderPriority(
+ candidate.getChunkMetadata().getVersion(),
+ candidate.getChunkMetadata().getOffsetOfChunkHeader());
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata,
+ // because statistics of ChunkSuit4CPV is updated, while statistics of
+ // ChunkSuit4CPV.ChunkMetadata
+ // is fixed.
+ long candidateTimestamp = candidate.getStatistics().getTopTimestamp(); // check
+ Object candidateValue = candidate.getStatistics().getMaxValue(); // check
+
+ // verify if this candidate point is deleted
+ boolean isDeletedItself = false;
+ if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) {
+ isDeletedItself = true;
+ } else {
+ isDeletedItself =
+ PageReader.isDeleted(
+ candidateTimestamp, candidate.getChunkMetadata().getDeleteIntervalList());
+ }
+ if (isDeletedItself) {
+ // the candidate point is deleted, then label the chunk as already lazy loaded, and back
+ // to loop 2
+ nonLazyLoad.remove(candidate);
+ // check this can really remove the element
+ // check whether nonLazyLoad remove affects candidateSet
+ // check nonLazyLoad sorted by version number from high to low
+ continue; // back to loop 2
+
+ } else { // not deleted
+ boolean isUpdate = false;
+ // find overlapping chunks with higher versions
+ List<ChunkSuit4CPV> overlaps = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority version =
+ new MergeReaderPriority(
+ chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(candidateVersion) <= 0) { // including topChunkMetadata
+ continue;
+ }
+ if (candidateTimestamp < chunkMetadata.getStartTime()
+ || candidateTimestamp > chunkMetadata.getEndTime()) {
+ continue;
+ }
+ if (candidateTimestamp == chunkSuit4CPV.getStatistics().getStartTime()
+ || candidateTimestamp == chunkSuit4CPV.getStatistics().getEndTime()) {
+ isUpdate = true; // note that here overlaps does not add.
+ // this case does not need to execute chunk data read operation (a),
+ // because definitely overwrite
+ break;
+ }
+ overlaps.add(chunkSuit4CPV);
+ }
+
+ if (!isUpdate && overlaps.size() == 0) {
+ // no overlaps, then the candidate point is not updated, then it is the final result
+ results
+ .get(1)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return; // finished
+ } else if (!isUpdate) {
+ // verify whether the candidate point is updated
+ for (ChunkSuit4CPV chunkSuit4CPV : overlaps) {
+ if (chunkSuit4CPV.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS
+ // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ chunkSuit4CPV.setPageReader(pageReader);
+ }
+ // chunk data read operation (a): check existence of data point at a timestamp
+ isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp);
+ if (isUpdate) {
+ // since the candidate point is updated, early break
+ break;
+ }
+ }
+ }
+ if (!isUpdate) {
+ // the candidate point is not updated, then it is the final result
+ results
+ .get(1)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return; // finished
+ } else {
+ // the candidate point is updated, then label the chunk as already lazy loaded,
+ // add the deletion of the candidate point in deleteInterval, and back to loop 2
+ if (candidate.getChunkMetadata().getDeleteIntervalList() == null) {
+ List<TimeRange> tmp = new ArrayList<>();
+ tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp));
+ candidate.getChunkMetadata().setDeleteIntervalList(tmp);
+ } else {
+ candidate
+ .getChunkMetadata()
+ .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp); // check
+ }
+ nonLazyLoad.remove(candidate);
+ // check this can really remove the element
+ // check whether nonLazyLoad remove affects candidateSet
+ // check nonLazyLoad sorted by version number from high to low
+ continue; // back to loop 2
+ }
+ }
+ }
+ }
+ }
+
+ private void calculateFirstPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ // IOMonitor2.M4_LSM_status = Operation.M4_LSM_FP;
+ while (currentChunkList.size() > 0) { // loop 1
+ // sorted by startTime and version, find FP candidate
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // double check the sort order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ int res =
+ ((Comparable) (o1.getStatistics().getStartTime()))
+ .compareTo(o2.getStatistics().getStartTime());
+ if (res != 0) {
+ return res;
+ } else {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ }
+ });
+
+ ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
+ if (susp_candidate.isLazyLoad()) {
+ // means the chunk is already lazy loaded, then load the chunk, apply deletes, update
+ // statistics,
+ // cancel the lazy loaded mark, and back to loop 1
+ if (susp_candidate.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ susp_candidate.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
+ // DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ susp_candidate.setPageReader(pageReader);
+ }
+ // chunk data read operation (b): get the closest data point after or before a timestamp
+ susp_candidate.updateFPwithTheClosetPointEqualOrAfter(
+ susp_candidate.getStatistics().getStartTime());
+ susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!!
+ continue; // back to loop 1
+ } else {
+ // the chunk has not been lazy loaded, then verify whether the candidate point is deleted
+ // Note the higher versions of deletes are guaranteed by
+ // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications)
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ long candidateTimestamp = susp_candidate.getStatistics().getStartTime(); // check
+ Object candidateValue = susp_candidate.getStatistics().getFirstValue(); // check
+
+ boolean isDeletedItself = false;
+ long deleteEndTime = -1;
+ List<TimeRange> deleteIntervalList =
+ susp_candidate.getChunkMetadata().getDeleteIntervalList();
+ if (deleteIntervalList != null) {
+ int deleteCursor = 0;
+ while (deleteCursor < deleteIntervalList.size()) {
+ if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) {
+ deleteCursor++;
+ } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ deleteEndTime = deleteIntervalList.get(deleteCursor).getMax();
+ break; // since delete intervals are already sorted and merged
+ } else {
+ break; // since delete intervals are already sorted and merged
+ }
+ }
+ }
+ if (isDeletedItself) {
+ // deleteEndTime may be after the current endTime,
+ // because deleteStartTime can be after the startTime of the whole chunk
+ if (deleteEndTime
+ >= susp_candidate.getStatistics().getEndTime()) { // NOTE here calculate FP
+ // deleted as a whole
+ currentChunkList.remove(susp_candidate);
+ } else {
+ // the candidate point is deleted, then label the chunk as already lazy loaded,
+ // update chunkStartTime without loading data, and back to loop 1
+ susp_candidate.setLazyLoad(true);
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ susp_candidate.getStatistics().setStartTime(deleteEndTime + 1); // check
+ // +1 is because delete is closed interval
+ }
+ continue; // back to loop 1
+ } else {
+ // the candidate point is not deleted, then it is the final result
+ results
+ .get(0)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ results
+ .get(2)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return;
+ }
+ }
+ }
+ }
+
+ private void calculateLastPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ // IOMonitor2.M4_LSM_status = Operation.M4_LSM_LP;
+ while (currentChunkList.size() > 0) { // loop 1
+ // sorted by endTime and version, find LP candidate
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() {
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ int res =
+ ((Comparable) (o2.getStatistics().getEndTime()))
+ .compareTo(o1.getStatistics().getEndTime());
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ if (res != 0) {
+ return res;
+ } else {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ }
+ });
+
+ ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
+ if (susp_candidate.isLazyLoad()) {
+ // means the chunk is already lazy loaded, then load the chunk, apply deletes, update
+ // statistics,
+ // cancel the lazy loaded mark, and back to loop 1
+ if (susp_candidate.getPageReader() == null) {
+ PageReader pageReader =
+ FileLoaderUtils.loadPageReaderList4CPV(
+ susp_candidate.getChunkMetadata(), this.timeFilter);
+ // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
+ // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+ // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+ // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
+ // DIRECTLY), WHICH WILL INTRODUCE BUGS!
+ susp_candidate.setPageReader(pageReader);
+ }
+ // update LP equal to or before statistics.getEndTime
+ // (b) get the closest data point after or before a timestamp
+ susp_candidate.updateLPwithTheClosetPointEqualOrBefore(
+ susp_candidate.getStatistics().getEndTime()); // DEBUG
+ susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!!
+ continue; // back to loop 1
+ } else {
+ // the chunk has not been lazy loaded, then verify whether the candidate point is deleted
+ // Note the higher versions of deletes are guaranteed by
+ // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications)
+ // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
+ long candidateTimestamp = susp_candidate.getStatistics().getEndTime(); // check
+ Object candidateValue = susp_candidate.getStatistics().getLastValue(); // check
+
+ boolean isDeletedItself = false;
+ long deleteStartTime = Long.MAX_VALUE; // check
+ List<TimeRange> deleteIntervalList =
+ susp_candidate.getChunkMetadata().getDeleteIntervalList();
+ if (deleteIntervalList != null) {
+ int deleteCursor = 0;
+ while (deleteCursor < deleteIntervalList.size()) {
+ if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) {
+ deleteCursor++;
+ } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ deleteStartTime = deleteIntervalList.get(deleteCursor).getMin();
+ break; // since delete intervals are already sorted and merged
+ } else {
+ break; // since delete intervals are already sorted and merged
+ }
+ }
+ }
+ if (isDeletedItself) {
+ // deleteStartTime may be before the current startTime,
+ // because deleteEndTime can be before the endTime of the whole chunk
+ if (deleteStartTime <= susp_candidate.getStatistics().getStartTime()) {
+ // NOTE here calculate LP.
+ // deleted as a whole
+ currentChunkList.remove(susp_candidate);
+ } else {
+ susp_candidate.setLazyLoad(true);
+ // NOTE here get statistics from ChunkSuit4CPV, not from
+ // ChunkSuit4CPV.ChunkMetadata
+ susp_candidate.getStatistics().setEndTime(deleteStartTime - 1);
+ // -1 is because delete is closed interval
+ // check
+ }
+ continue; // back to loop 1
+ } else {
+ // the candidate point is not deleted, then it is the final result
+ results
+ .get(1)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ results
+ .get(3)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+ throws IOException {
+ throw new IOException("no implemented");
+ }
+
+ @Override
+ public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+ throws IOException, QueryProcessException {
+ throw new IOException("no implemented");
+ }
+
+ public List<ChunkSuit4CPV> getCurrentChunkList() {
+ return currentChunkList;
+ }
+
+ public List<ChunkSuit4CPV> getFutureChunkList() {
+ return futureChunkList;
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 337f990d42f..c745982c5a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -27,6 +27,8 @@ import java.nio.charset.Charset;
/** TSFileConfig is a configure class. Every variables is public and has default value. */
public class TSFileConfig implements Serializable {
+ private boolean enableMinMaxLSM = false;
+
private boolean useStatistics = true;
private boolean useTimeIndex = true;
@@ -157,6 +159,14 @@ public class TSFileConfig implements Serializable {
public TSFileConfig() {}
+ public boolean isEnableMinMaxLSM() {
+ return enableMinMaxLSM;
+ }
+
+ public void setEnableMinMaxLSM(boolean enableMinMaxLSM) {
+ this.enableMinMaxLSM = enableMinMaxLSM;
+ }
+
public boolean isUseStatistics() {
return useStatistics;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index e4d874e4e76..3aa322b4e89 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -105,6 +105,10 @@ public class TSFileDescriptor {
Properties properties = new Properties();
try {
properties.load(inputStream);
+ conf.setEnableMinMaxLSM(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enableMinMaxLSM", Boolean.toString(conf.isEnableMinMaxLSM()))));
conf.setUseStatistics(
Boolean.parseBoolean(
properties.getProperty("use_Statistics", Boolean.toString(conf.isUseStatistics()))));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
index fd160b3fd6f..c20a9de8994 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java
@@ -30,6 +30,8 @@ public class IOMonitor2 {
GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeIndex, // cpv_noTimeIndex
GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoValueIndex, // cpv_noValueIndex
GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_NoTimeValueIndex, // cpv_noTimeValueIndex
+
+ GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_EnableMinMaxLSM, // MinMax-LSM
GroupByWithoutValueFilterDataSet_LocalGroupByExecutor_UseStatistics, // moc
GroupByWithoutValueFilterDataSet_LocalGroupByExecutor_NotUseStatistics // mac_groupBy
}