You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/07/01 05:35:52 UTC
[iotdb] branch research/M4-visualization updated: fix
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/M4-visualization by this push:
new 38b532bf21 fix
38b532bf21 is described below
commit 38b532bf21ee27070fd8c9e35b9c1ce788efdaf1
Author: Lei Rui <10...@qq.com>
AuthorDate: Fri Jul 1 13:35:33 2022 +0800
fix
---
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 1424 ++++++++++----------
.../iotdb/tsfile/read/reader/page/PageReader.java | 44 +-
2 files changed, 737 insertions(+), 731 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index bb9e837e0f..59feabe4e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -19,14 +19,6 @@
package org.apache.iotdb.db.query.dataset.groupby;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -59,6 +51,15 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
/**
* Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0),
* max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the
@@ -75,7 +76,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
private List<ChunkSuit4CPV> currentChunkList;
private final List<ChunkSuit4CPV> futureChunkList = new ArrayList<>();
- // this is designed to keep the split chunk from futureChunkList, not destroying the sorted order of futureChunkList
+ // this is designed to keep the split chunk from futureChunkList, not destroying the sorted order
+ // of futureChunkList
private Map<Integer, List<ChunkSuit4CPV>> splitChunkList = new HashMap<>();
private Filter timeFilter;
@@ -138,8 +140,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
results.add(aggrResult);
}
- private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime,
- long startTime, long endTime, long interval) throws IOException {
+ private void getCurrentChunkListFromFutureChunkList(
+ long curStartTime, long curEndTime, long startTime, long endTime, long interval)
+ throws IOException {
// empty currentChunkList
currentChunkList = new ArrayList<>();
@@ -151,7 +154,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
// iterate futureChunkList
ListIterator itr = futureChunkList.listIterator();
-// List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+ // List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
while (itr.hasNext()) {
ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
@@ -196,9 +199,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
/**
* @param curStartTime closed
- * @param curEndTime open
- * @param startTime closed
- * @param endTime open
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
*/
@Override
public List<AggregateResult> calcResult(
@@ -223,9 +226,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return results;
}
- /**
- * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中
- */
+ /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) {
if (chunkSuit4CPV.getBatchData() != null) {
BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false);
@@ -322,8 +323,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -410,7 +411,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -454,7 +455,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -520,8 +521,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -608,7 +609,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -652,7 +653,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -700,8 +701,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -766,11 +767,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
results
.get(0)
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
results
.get(2)
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
return;
}
}
@@ -797,8 +798,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -863,687 +864,702 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor {
results
.get(1)
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
results
.get(3)
.updateResultUsingValues(
- new long[]{candidateTimestamp}, 1, new Object[]{candidateValue});
+ new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
return;
}
}
}
}
-// /**
-// * @param curStartTime closed
-// * @param curEndTime open
-// * @param startTime closed
-// * @param endTime open
-// */
-// public List<AggregateResult> calcResult_deprecated(
-// long curStartTime, long curEndTime, long startTime, long endTime, long interval)
-// throws IOException, QueryProcessException {
-// // System.out.println("====DEBUG====: calcResult for [" + curStartTime + "," + curEndTime +
-// // ")");
-//
-// // clear result cache
-// for (AggregateResult result : results) {
-// result.reset();
-// }
-// // empty currentChunkList
-// currentChunkList = new ArrayList<>();
-//
-// // System.out.println("====DEBUG====: deal with futureChunkList");
-//
-// ListIterator itr = futureChunkList.listIterator();
-// List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
-// while (itr.hasNext()) {
-// ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
-// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
-// long chunkMinTime = chunkMetadata.getStartTime();
-// long chunkMaxTime = chunkMetadata.getEndTime();
-// if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
-// // the chunk falls on the right side of the current M4 interval Ii
-// continue;
-// } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
-// // the chunk falls on the left side of the current M4 interval Ii
-// // or the chunk falls on the right side of the total query range
-// itr.remove();
-// } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
-// // the chunk falls completely within the current M4 interval Ii
-// currentChunkList.add(chunkSuit4CPV);
-// itr.remove();
-// } else {
-// // the chunk partially overlaps in time with the current M4 interval Ii.
-// // load this chunk, split it on deletes and all w intervals.
-// // add to currentChunkList and futureChunkList.
-// itr.remove();
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
-// for (IPageReader pageReader : pageReaderList) {
-// // assume only one page in a chunk
-// // assume all data on disk, no data in memory
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// currentChunkList,
-// tmpFutureChunkList,
-// chunkMetadata);
-// }
-//
-// // System.out.println(
-// // "====DEBUG====: load the chunk because overlaps the M4 interval. Version="
-// // + chunkMetadata.getVersion()
-// // + " "
-// // + chunkMetadata.getOffsetOfChunkHeader());
-// }
-// }
-// futureChunkList.addAll(tmpFutureChunkList);
-// tmpFutureChunkList = null;
-// itr = null;
-//
-// // System.out.println("====DEBUG====: deal with currentChunkList");
-//
-// if (currentChunkList.size() == 0) {
-// return results;
-// }
-//
-// boolean[] isFinal = new boolean[4]; // default false
-// do {
-// long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime, topTime
-// Object[] values = new Object[4]; // firstValue, lastValue, bottomValue, topValue
-// PriorityMergeReader.MergeReaderPriority[] versions =
-// new PriorityMergeReader.MergeReaderPriority[4];
-// int[] listIdx = new int[4];
-// timestamps[0] = -1;
-// timestamps[1] = -1;
-// values[2] = null;
-// values[3] = null;
-//
-// // find candidate points
-// // System.out.println("====DEBUG====: find candidate points");
-// // TODO: may change the loop of generating candidate points? sort first??
-//
-// for (int j = 0; j < currentChunkList.size(); j++) {
-// ChunkMetadata chunkMetadata = currentChunkList.get(j).getChunkMetadata();
-// Statistics statistics = chunkMetadata.getStatistics();
-// MergeReaderPriority version =
-// new MergeReaderPriority(
-// chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
-// // update firstPoint
-// if (!isFinal[0]) {
-// if (timestamps[0] == -1
-// || (statistics.getStartTime() < timestamps[0])
-// || (statistics.getStartTime() == timestamps[0]
-// && version.compareTo(versions[0]) > 0)) {
-// timestamps[0] = statistics.getStartTime();
-// values[0] = statistics.getFirstValue();
-// versions[0] = version;
-// listIdx[0] = j;
-// }
-// }
-// // update lastPoint
-// if (!isFinal[1]) {
-// if (timestamps[1] == -1
-// || (statistics.getEndTime() > timestamps[1])
-// || (statistics.getEndTime() == timestamps[1] && version.compareTo(versions[1]) > 0)) {
-// timestamps[1] = statistics.getEndTime();
-// values[1] = statistics.getLastValue();
-// versions[1] = version;
-// listIdx[1] = j;
-// }
-// }
-// // update bottomPoint
-// if (!isFinal[2]) {
-// if (values[2] == null
-// || (((Comparable) (values[2])).compareTo(statistics.getMinValue()) > 0)) {
-// timestamps[2] = statistics.getBottomTimestamp();
-// values[2] = statistics.getMinValue();
-// versions[2] = version;
-// listIdx[2] = j;
-// }
-// }
-// // update topPoint
-// if (!isFinal[3]) {
-// if (values[3] == null
-// || (((Comparable) (values[3])).compareTo(statistics.getMaxValue()) < 0)) {
-// timestamps[3] = statistics.getTopTimestamp();
-// values[3] = statistics.getMaxValue();
-// versions[3] = version;
-// listIdx[3] = j;
-// }
-// }
-// }
-//
-// // System.out.println("====DEBUG====: verify candidate points");
-//
-// // verify candidate points.
-// // firstPoint and lastPoint are valid for sure.
-// // default results sequence: min_time(%s), max_time(%s), first_value(%s), last_value(%s),
-// // min_value(%s), max_value(%s)
-// if (!isFinal[0]) { // firstPoint
-// long firstTimestamp = timestamps[0];
-// ChunkMetadata firstChunkMetadata = currentChunkList.get(listIdx[0]).getChunkMetadata();
-// // check if the point is deleted:
-// List<TimeRange> firstDeleteIntervalList = firstChunkMetadata.getDeleteIntervalList();
-// boolean isDeletedItself = false;
-// if (firstDeleteIntervalList != null) {
-// for (TimeRange timeRange : firstDeleteIntervalList) {
-// if (timeRange.contains(firstTimestamp)) {
-// isDeletedItself = true;
-// break;
-// }
-// }
-// }
-// if (isDeletedItself) {
-// // System.out.println(
-// // "====DEBUG====: load the chunk because candidate firstPoint is actually
-// // deleted. Version="
-// // + firstChunkMetadata.getVersion()
-// // + " "
-// // + firstChunkMetadata.getOffsetOfChunkHeader());
-//
-// currentChunkList.remove(listIdx[0]);
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(firstChunkMetadata, this.timeFilter);
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// currentChunkList,
-// null,
-// firstChunkMetadata);
-// }
-// continue; // next iteration to check currentChunkList
-// } else {
-// results
-// .get(0)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 0, 1),
-// 1,
-// Arrays.copyOfRange(values, 0, 1)); // min_time
-// results
-// .get(2)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 0, 1),
-// 1,
-// Arrays.copyOfRange(values, 0, 1)); // first_value
-// isFinal[0] = true;
-// // System.out.println("====DEBUG====: find firstPoint");
-// }
-// }
-// if (!isFinal[1]) { // lastPoint
-// long lastTimestamp = timestamps[1];
-// ChunkMetadata lastChunkMetadata = currentChunkList.get(listIdx[1]).getChunkMetadata();
-// // check if the point is deleted:
-// List<TimeRange> lastDeleteIntervalList = lastChunkMetadata.getDeleteIntervalList();
-// boolean isDeletedItself = false;
-// if (lastDeleteIntervalList != null) {
-// for (TimeRange timeRange : lastDeleteIntervalList) {
-// if (timeRange.contains(lastTimestamp)) {
-// isDeletedItself = true;
-// break;
-// }
-// }
-// }
-// if (isDeletedItself) {
-// // System.out.println(
-// // "====DEBUG====: load the chunk because candidate lastPoint is actually
-// // deleted. Version="
-// // + lastChunkMetadata.getVersion()
-// // + " "
-// // + lastChunkMetadata.getOffsetOfChunkHeader());
-//
-// currentChunkList.remove(listIdx[1]);
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(lastChunkMetadata, this.timeFilter);
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// currentChunkList,
-// null,
-// lastChunkMetadata);
-// }
-// continue; // next iteration to check currentChunkList
-// } else {
-// results
-// .get(1)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 1, 2),
-// 1,
-// Arrays.copyOfRange(values, 1, 2)); // min_time
-// results
-// .get(3)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 1, 2),
-// 1,
-// Arrays.copyOfRange(values, 1, 2)); // first_value
-// isFinal[1] = true;
-//
-// // System.out.println("====DEBUG====: find lastPoint");
-// }
-// }
-// if (!isFinal[2]) { // bottomPoint
-// long bottomTimestamp = timestamps[2];
-// ChunkMetadata bottomChunkMetadata = currentChunkList.get(listIdx[2]).getChunkMetadata();
-// List<Long> mergedVersionList = currentChunkList.get(listIdx[2]).getMergeVersionList();
-// List<Long> mergedOffsetList = currentChunkList.get(listIdx[2]).getMergeOffsetList();
-// // check if the point is deleted:
-// List<TimeRange> bottomDeleteIntervalList = bottomChunkMetadata.getDeleteIntervalList();
-// boolean isDeletedItself = false;
-// if (bottomDeleteIntervalList != null) {
-// for (TimeRange timeRange : bottomDeleteIntervalList) {
-// if (timeRange.contains(bottomTimestamp)) {
-// isDeletedItself = true;
-// break;
-// }
-// }
-// }
-// if (isDeletedItself) {
-// // System.out.println(
-// // "====DEBUG====: load the chunk because candidate bottomPoint is actually
-// // deleted. Version="
-// // + bottomChunkMetadata.getVersion()
-// // + " "
-// // + bottomChunkMetadata.getOffsetOfChunkHeader());
-//
-// currentChunkList.remove(listIdx[2]);
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(bottomChunkMetadata, this.timeFilter);
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// currentChunkList,
-// null,
-// bottomChunkMetadata);
-// }
-// continue; // next iteration to check currentChunkList
-// } else { // verify if it is overlapped by other chunks with larger version number and not in
-// // the deleted time interval
-// List<Integer> toMerge = new ArrayList<>();
-// for (int i = 0; i < currentChunkList.size(); i++) {
-// ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
-// MergeReaderPriority version =
-// new MergeReaderPriority(
-// chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
-// if (version.compareTo(versions[2]) <= 0) { // including bottomChunkMetadata
-// continue;
-// }
-// if (bottomTimestamp < chunkMetadata.getStartTime()
-// || bottomTimestamp > chunkMetadata.getEndTime()) {
-// continue;
-// }
-// boolean isMerged = false;
-// for (int k = 0; k < mergedVersionList.size(); k++) {
-// // these chunks are MARKED "merged" - not overlapped any more
-// if (mergedVersionList.get(k) == chunkMetadata.getVersion()
-// && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) {
-// isMerged = true;
-// break;
-// }
-// }
-// if (isMerged) {
-// continue;
-// }
-// toMerge.add(i);
-// }
-// if (toMerge.isEmpty()) {
-// // System.out.println("====DEBUG====: find bottomPoint");
-//
-// results
-// .get(4)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 2, 3),
-// 1,
-// Arrays.copyOfRange(values, 2, 3)); // min_value
-// isFinal[2] = true;
-// } else {
-// // deal with toMerge chunks: delete updated points
-// toMerge.add(listIdx[2]);
-// List<Long> newMergedVersionList = new ArrayList<>();
-// List<Long> newMergedOffsetList = new ArrayList<>();
-// for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any more
-// ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
-// newMergedVersionList.add(tmpChunkMetadata.getVersion());
-// newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
-// }
-// Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
-// Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
-// for (int o = 0; o < toMerge.size(); o++) {
-// // create empty batchData
-// ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
-// MergeReaderPriority mergeReaderPriority =
-// new MergeReaderPriority(
-// chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
-// BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
-// updateBatchDataMap.put(mergeReaderPriority, batch1);
-// // create empty statistics
-// Statistics statistics = null;
-// switch (tsDataType) {
-// case INT32:
-// statistics = new IntegerStatistics();
-// break;
-// case INT64:
-// statistics = new LongStatistics();
-// break;
-// case FLOAT:
-// statistics = new FloatStatistics();
-// break;
-// case DOUBLE:
-// statistics = new DoubleStatistics();
-// break;
-// default:
-// break;
-// }
-// statisticsMap.put(mergeReaderPriority, statistics);
-// // prepare mergeReader
-// if (chunkSuit4CPV.getBatchData() == null) {
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter);
-// List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// tmpCurrentChunkList,
-// null,
-// chunkMetadata);
-// }
-// currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
-// chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-//
-// // System.out.println(
-// // "====DEBUG====: load chunk for update merge. Version="
-// // + chunkMetadata.getVersion()
-// // + " "
-// // + chunkMetadata.getOffsetOfChunkHeader());
-// }
-// mergeReader.addReader(
-// chunkSuit4CPV.getBatchData().getBatchDataIterator(),
-// new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset()));
-// }
-// while (mergeReader.hasNextTimeValuePair()) {
-// Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
-// TimeValuePair ret = res.left;
-// // System.out.println(
-// // "====DEBUG====: merge for bottomPoint. (t,v)="
-// // + ret.getTimestamp()
-// // + ","
-// // + ret.getValue().getValue());
-// updateBatchDataMap
-// .get(res.right)
-// .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
-// switch (tsDataType) {
-// case INT32:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (int) ret.getValue().getValue());
-// break;
-// case INT64:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (long) ret.getValue().getValue());
-// break;
-// case FLOAT:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (float) ret.getValue().getValue());
-// break;
-// case DOUBLE:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (double) ret.getValue().getValue());
-// break;
-// default:
-// throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
-// }
-// }
-// mergeReader.close();
-//
-// for (int o = 0; o < toMerge.size(); o++) {
-// ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-// // to MARK these chunks are "merged" - not overlapped any more
-// chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
-// chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
-// // update BatchData
-// MergeReaderPriority mergeReaderPriority =
-// new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset());
-// chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
-// chunkSuit4CPV
-// .getChunkMetadata()
-// .setStatistics(statisticsMap.get(mergeReaderPriority));
-// }
-// // System.out.println(
-// // "====DEBUG====: merged chunks are : version="
-// // + newMergedVersionList
-// // + " offsets="
-// // + newMergedOffsetList);
-// continue;
-// }
-// }
-// }
-//
-// if (!isFinal[3]) { // topPoint
-// long topTimestamp = timestamps[3];
-// ChunkMetadata topChunkMetadata = currentChunkList.get(listIdx[3]).getChunkMetadata();
-// List<Long> mergedVersionList = currentChunkList.get(listIdx[3]).getMergeVersionList();
-// List<Long> mergedOffsetList = currentChunkList.get(listIdx[3]).getMergeOffsetList();
-// // check if the point is deleted:
-// List<TimeRange> topDeleteIntervalList = topChunkMetadata.getDeleteIntervalList();
-// boolean isDeletedItself = false;
-// if (topDeleteIntervalList != null) {
-// for (TimeRange timeRange : topDeleteIntervalList) {
-// if (timeRange.contains(topTimestamp)) {
-// isDeletedItself = true;
-// break;
-// }
-// }
-// }
-// if (isDeletedItself) {
-// // System.out.println(
-// // "====DEBUG====: load the chunk because candidate topPoint is actually
-// // deleted. Version="
-// // + topChunkMetadata.getVersion()
-// // + " "
-// // + topChunkMetadata.getOffsetOfChunkHeader());
-//
-// currentChunkList.remove(listIdx[3]);
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(topChunkMetadata, this.timeFilter);
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// currentChunkList,
-// null,
-// topChunkMetadata);
-// }
-// continue; // next iteration to check currentChunkList
-// } else { // verify if it is overlapped by other chunks with larger version number and not in
-// // the deleted time interval
-// List<Integer> toMerge = new ArrayList<>();
-// for (int i = 0; i < currentChunkList.size(); i++) {
-// ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
-// MergeReaderPriority version =
-// new MergeReaderPriority(
-// chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
-// if (version.compareTo(versions[3]) <= 0) { // including topChunkMetadata
-// continue;
-// }
-// if (topTimestamp < chunkMetadata.getStartTime()
-// || topTimestamp > chunkMetadata.getEndTime()) {
-// continue;
-// }
-// boolean isMerged = false;
-// for (int k = 0; k < mergedVersionList.size(); k++) {
-// if (mergedVersionList.get(k) == chunkMetadata.getVersion()
-// && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) {
-// isMerged = true;
-// break;
-// }
-// }
-// if (isMerged) {
-// continue;
-// }
-// toMerge.add(i);
-// }
-// if (toMerge.isEmpty()) {
-// results
-// .get(5)
-// .updateResultUsingValues(
-// Arrays.copyOfRange(timestamps, 3, 4),
-// 1,
-// Arrays.copyOfRange(values, 3, 4)); // max_value
-// isFinal[3] = true;
-// // System.out.println("====DEBUG====: find topPoint");
-// return results;
-// } else {
-// // deal with toMerge chunks: delete updated points
-// toMerge.add(listIdx[3]);
-// List<Long> newMergedVersionList = new ArrayList<>();
-// List<Long> newMergedOffsetList = new ArrayList<>();
-// for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any more
-// ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
-// newMergedVersionList.add(tmpChunkMetadata.getVersion());
-// newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
-// }
-// Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
-// Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
-// for (int o = 0; o < toMerge.size(); o++) {
-// // create empty batchData
-// ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
-// MergeReaderPriority mergeReaderPriority =
-// new MergeReaderPriority(
-// chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
-// BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
-// updateBatchDataMap.put(mergeReaderPriority, batch1);
-// // create empty statistics
-// Statistics statistics = null;
-// switch (tsDataType) {
-// case INT32:
-// statistics = new IntegerStatistics();
-// break;
-// case INT64:
-// statistics = new LongStatistics();
-// break;
-// case FLOAT:
-// statistics = new FloatStatistics();
-// break;
-// case DOUBLE:
-// statistics = new DoubleStatistics();
-// break;
-// default:
-// break;
-// }
-// statisticsMap.put(mergeReaderPriority, statistics);
-// // prepare mergeReader
-// if (chunkSuit4CPV.getBatchData() == null) {
-// List<IPageReader> pageReaderList =
-// FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter);
-// List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
-// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
-// ((PageReader) pageReader)
-// .split4CPV(
-// startTime,
-// endTime,
-// interval,
-// curStartTime,
-// tmpCurrentChunkList,
-// null,
-// chunkMetadata);
-// }
-// currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
-// chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-//
-// // System.out.println(
-// // "====DEBUG====: load chunk for update merge. Version="
-// // + chunkMetadata.getVersion()
-// // + " "
-// // + chunkMetadata.getOffsetOfChunkHeader());
-// }
-// mergeReader.addReader(
-// chunkSuit4CPV.getBatchData().getBatchDataIterator(),
-// new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset()));
-// }
-// while (mergeReader.hasNextTimeValuePair()) {
-// Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
-// TimeValuePair ret = res.left;
-// // System.out.println(
-// // "====DEBUG====: merge for topPoint. (t,v)="
-// // + ret.getTimestamp()
-// // + ","
-// // + ret.getValue().getValue());
-// updateBatchDataMap
-// .get(res.right)
-// .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
-// switch (tsDataType) {
-// case INT32:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (int) ret.getValue().getValue());
-// break;
-// case INT64:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (long) ret.getValue().getValue());
-// break;
-// case FLOAT:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (float) ret.getValue().getValue());
-// break;
-// case DOUBLE:
-// statisticsMap
-// .get(res.right)
-// .update(ret.getTimestamp(), (double) ret.getValue().getValue());
-// break;
-// default:
-// throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
-// }
-// }
-// mergeReader.close();
-//
-// for (int o = 0; o < toMerge.size(); o++) {
-// ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-// // to MARK these chunks are "merged" - not overlapped any more
-// chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
-// chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
-// // update BatchData
-// MergeReaderPriority mergeReaderPriority =
-// new MergeReaderPriority(chunkSuit4CPV.getVersion(), chunkSuit4CPV.getOffset());
-// chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
-// chunkSuit4CPV
-// .getChunkMetadata()
-// .setStatistics(statisticsMap.get(mergeReaderPriority));
-// }
-// continue;
-// }
-// }
-// }
-// } while (true);
-// }
+ // /**
+ // * @param curStartTime closed
+ // * @param curEndTime open
+ // * @param startTime closed
+ // * @param endTime open
+ // */
+ // public List<AggregateResult> calcResult_deprecated(
+ // long curStartTime, long curEndTime, long startTime, long endTime, long interval)
+ // throws IOException, QueryProcessException {
+ // // System.out.println("====DEBUG====: calcResult for [" + curStartTime + "," + curEndTime
+ // +
+ // // ")");
+ //
+ // // clear result cache
+ // for (AggregateResult result : results) {
+ // result.reset();
+ // }
+ // // empty currentChunkList
+ // currentChunkList = new ArrayList<>();
+ //
+ // // System.out.println("====DEBUG====: deal with futureChunkList");
+ //
+ // ListIterator itr = futureChunkList.listIterator();
+ // List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+ // while (itr.hasNext()) {
+ // ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
+ // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ // long chunkMinTime = chunkMetadata.getStartTime();
+ // long chunkMaxTime = chunkMetadata.getEndTime();
+ // if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
+ // // the chunk falls on the right side of the current M4 interval Ii
+ // continue;
+ // } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
+ // // the chunk falls on the left side of the current M4 interval Ii
+ // // or the chunk falls on the right side of the total query range
+ // itr.remove();
+ // } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
+ // // the chunk falls completely within the current M4 interval Ii
+ // currentChunkList.add(chunkSuit4CPV);
+ // itr.remove();
+ // } else {
+ // // the chunk partially overlaps in time with the current M4 interval Ii.
+ // // load this chunk, split it on deletes and all w intervals.
+ // // add to currentChunkList and futureChunkList.
+ // itr.remove();
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(),
+ // this.timeFilter);
+ // for (IPageReader pageReader : pageReaderList) {
+ // // assume only one page in a chunk
+ // // assume all data on disk, no data in memory
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // currentChunkList,
+ // tmpFutureChunkList,
+ // chunkMetadata);
+ // }
+ //
+ // // System.out.println(
+ // // "====DEBUG====: load the chunk because overlaps the M4 interval. Version="
+ // // + chunkMetadata.getVersion()
+ // // + " "
+ // // + chunkMetadata.getOffsetOfChunkHeader());
+ // }
+ // }
+ // futureChunkList.addAll(tmpFutureChunkList);
+ // tmpFutureChunkList = null;
+ // itr = null;
+ //
+ // // System.out.println("====DEBUG====: deal with currentChunkList");
+ //
+ // if (currentChunkList.size() == 0) {
+ // return results;
+ // }
+ //
+ // boolean[] isFinal = new boolean[4]; // default false
+ // do {
+ // long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime, topTime
+ // Object[] values = new Object[4]; // firstValue, lastValue, bottomValue, topValue
+ // PriorityMergeReader.MergeReaderPriority[] versions =
+ // new PriorityMergeReader.MergeReaderPriority[4];
+ // int[] listIdx = new int[4];
+ // timestamps[0] = -1;
+ // timestamps[1] = -1;
+ // values[2] = null;
+ // values[3] = null;
+ //
+ // // find candidate points
+ // // System.out.println("====DEBUG====: find candidate points");
+ // // TODO: may change the loop of generating candidate points? sort first??
+ //
+ // for (int j = 0; j < currentChunkList.size(); j++) {
+ // ChunkMetadata chunkMetadata = currentChunkList.get(j).getChunkMetadata();
+ // Statistics statistics = chunkMetadata.getStatistics();
+ // MergeReaderPriority version =
+ // new MergeReaderPriority(
+ // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ // // update firstPoint
+ // if (!isFinal[0]) {
+ // if (timestamps[0] == -1
+ // || (statistics.getStartTime() < timestamps[0])
+ // || (statistics.getStartTime() == timestamps[0]
+ // && version.compareTo(versions[0]) > 0)) {
+ // timestamps[0] = statistics.getStartTime();
+ // values[0] = statistics.getFirstValue();
+ // versions[0] = version;
+ // listIdx[0] = j;
+ // }
+ // }
+ // // update lastPoint
+ // if (!isFinal[1]) {
+ // if (timestamps[1] == -1
+ // || (statistics.getEndTime() > timestamps[1])
+ // || (statistics.getEndTime() == timestamps[1] && version.compareTo(versions[1]) >
+ // 0)) {
+ // timestamps[1] = statistics.getEndTime();
+ // values[1] = statistics.getLastValue();
+ // versions[1] = version;
+ // listIdx[1] = j;
+ // }
+ // }
+ // // update bottomPoint
+ // if (!isFinal[2]) {
+ // if (values[2] == null
+ // || (((Comparable) (values[2])).compareTo(statistics.getMinValue()) > 0)) {
+ // timestamps[2] = statistics.getBottomTimestamp();
+ // values[2] = statistics.getMinValue();
+ // versions[2] = version;
+ // listIdx[2] = j;
+ // }
+ // }
+ // // update topPoint
+ // if (!isFinal[3]) {
+ // if (values[3] == null
+ // || (((Comparable) (values[3])).compareTo(statistics.getMaxValue()) < 0)) {
+ // timestamps[3] = statistics.getTopTimestamp();
+ // values[3] = statistics.getMaxValue();
+ // versions[3] = version;
+ // listIdx[3] = j;
+ // }
+ // }
+ // }
+ //
+ // // System.out.println("====DEBUG====: verify candidate points");
+ //
+ // // verify candidate points.
+ // // firstPoint and lastPoint are valid for sure.
+ // // default results sequence: min_time(%s), max_time(%s), first_value(%s), last_value(%s),
+ // // min_value(%s), max_value(%s)
+ // if (!isFinal[0]) { // firstPoint
+ // long firstTimestamp = timestamps[0];
+ // ChunkMetadata firstChunkMetadata = currentChunkList.get(listIdx[0]).getChunkMetadata();
+ // // check if the point is deleted:
+ // List<TimeRange> firstDeleteIntervalList = firstChunkMetadata.getDeleteIntervalList();
+ // boolean isDeletedItself = false;
+ // if (firstDeleteIntervalList != null) {
+ // for (TimeRange timeRange : firstDeleteIntervalList) {
+ // if (timeRange.contains(firstTimestamp)) {
+ // isDeletedItself = true;
+ // break;
+ // }
+ // }
+ // }
+ // if (isDeletedItself) {
+ // // System.out.println(
+ // // "====DEBUG====: load the chunk because candidate firstPoint is
+ // actually
+ // // deleted. Version="
+ // // + firstChunkMetadata.getVersion()
+ // // + " "
+ // // + firstChunkMetadata.getOffsetOfChunkHeader());
+ //
+ // currentChunkList.remove(listIdx[0]);
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(firstChunkMetadata, this.timeFilter);
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // currentChunkList,
+ // null,
+ // firstChunkMetadata);
+ // }
+ // continue; // next iteration to check currentChunkList
+ // } else {
+ // results
+ // .get(0)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 0, 1),
+ // 1,
+ // Arrays.copyOfRange(values, 0, 1)); // min_time
+ // results
+ // .get(2)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 0, 1),
+ // 1,
+ // Arrays.copyOfRange(values, 0, 1)); // first_value
+ // isFinal[0] = true;
+ // // System.out.println("====DEBUG====: find firstPoint");
+ // }
+ // }
+ // if (!isFinal[1]) { // lastPoint
+ // long lastTimestamp = timestamps[1];
+ // ChunkMetadata lastChunkMetadata = currentChunkList.get(listIdx[1]).getChunkMetadata();
+ // // check if the point is deleted:
+ // List<TimeRange> lastDeleteIntervalList = lastChunkMetadata.getDeleteIntervalList();
+ // boolean isDeletedItself = false;
+ // if (lastDeleteIntervalList != null) {
+ // for (TimeRange timeRange : lastDeleteIntervalList) {
+ // if (timeRange.contains(lastTimestamp)) {
+ // isDeletedItself = true;
+ // break;
+ // }
+ // }
+ // }
+ // if (isDeletedItself) {
+ // // System.out.println(
+ // // "====DEBUG====: load the chunk because candidate lastPoint is actually
+ // // deleted. Version="
+ // // + lastChunkMetadata.getVersion()
+ // // + " "
+ // // + lastChunkMetadata.getOffsetOfChunkHeader());
+ //
+ // currentChunkList.remove(listIdx[1]);
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(lastChunkMetadata, this.timeFilter);
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // currentChunkList,
+ // null,
+ // lastChunkMetadata);
+ // }
+ // continue; // next iteration to check currentChunkList
+ // } else {
+ // results
+ // .get(1)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 1, 2),
+ // 1,
+ // Arrays.copyOfRange(values, 1, 2)); // min_time
+ // results
+ // .get(3)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 1, 2),
+ // 1,
+ // Arrays.copyOfRange(values, 1, 2)); // first_value
+ // isFinal[1] = true;
+ //
+ // // System.out.println("====DEBUG====: find lastPoint");
+ // }
+ // }
+ // if (!isFinal[2]) { // bottomPoint
+ // long bottomTimestamp = timestamps[2];
+ // ChunkMetadata bottomChunkMetadata = currentChunkList.get(listIdx[2]).getChunkMetadata();
+ // List<Long> mergedVersionList = currentChunkList.get(listIdx[2]).getMergeVersionList();
+ // List<Long> mergedOffsetList = currentChunkList.get(listIdx[2]).getMergeOffsetList();
+ // // check if the point is deleted:
+ // List<TimeRange> bottomDeleteIntervalList = bottomChunkMetadata.getDeleteIntervalList();
+ // boolean isDeletedItself = false;
+ // if (bottomDeleteIntervalList != null) {
+ // for (TimeRange timeRange : bottomDeleteIntervalList) {
+ // if (timeRange.contains(bottomTimestamp)) {
+ // isDeletedItself = true;
+ // break;
+ // }
+ // }
+ // }
+ // if (isDeletedItself) {
+ // // System.out.println(
+ // // "====DEBUG====: load the chunk because candidate bottomPoint is
+ // actually
+ // // deleted. Version="
+ // // + bottomChunkMetadata.getVersion()
+ // // + " "
+ // // + bottomChunkMetadata.getOffsetOfChunkHeader());
+ //
+ // currentChunkList.remove(listIdx[2]);
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(bottomChunkMetadata, this.timeFilter);
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // currentChunkList,
+ // null,
+ // bottomChunkMetadata);
+ // }
+ // continue; // next iteration to check currentChunkList
+ // } else { // verify if it is overlapped by other chunks with larger version number and
+ // not in
+ // // the deleted time interval
+ // List<Integer> toMerge = new ArrayList<>();
+ // for (int i = 0; i < currentChunkList.size(); i++) {
+ // ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
+ // MergeReaderPriority version =
+ // new MergeReaderPriority(
+ // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ // if (version.compareTo(versions[2]) <= 0) { // including bottomChunkMetadata
+ // continue;
+ // }
+ // if (bottomTimestamp < chunkMetadata.getStartTime()
+ // || bottomTimestamp > chunkMetadata.getEndTime()) {
+ // continue;
+ // }
+ // boolean isMerged = false;
+ // for (int k = 0; k < mergedVersionList.size(); k++) {
+ // // these chunks are MARKED "merged" - not overlapped any more
+ // if (mergedVersionList.get(k) == chunkMetadata.getVersion()
+ // && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) {
+ // isMerged = true;
+ // break;
+ // }
+ // }
+ // if (isMerged) {
+ // continue;
+ // }
+ // toMerge.add(i);
+ // }
+ // if (toMerge.isEmpty()) {
+ // // System.out.println("====DEBUG====: find bottomPoint");
+ //
+ // results
+ // .get(4)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 2, 3),
+ // 1,
+ // Arrays.copyOfRange(values, 2, 3)); // min_value
+ // isFinal[2] = true;
+ // } else {
+ // // deal with toMerge chunks: delete updated points
+ // toMerge.add(listIdx[2]);
+ // List<Long> newMergedVersionList = new ArrayList<>();
+ // List<Long> newMergedOffsetList = new ArrayList<>();
+ // for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any
+ // more
+ // ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
+ // newMergedVersionList.add(tmpChunkMetadata.getVersion());
+ // newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+ // }
+ // Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
+ // Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
+ // for (int o = 0; o < toMerge.size(); o++) {
+ // // create empty batchData
+ // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ // MergeReaderPriority mergeReaderPriority =
+ // new MergeReaderPriority(
+ // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ // BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
+ // updateBatchDataMap.put(mergeReaderPriority, batch1);
+ // // create empty statistics
+ // Statistics statistics = null;
+ // switch (tsDataType) {
+ // case INT32:
+ // statistics = new IntegerStatistics();
+ // break;
+ // case INT64:
+ // statistics = new LongStatistics();
+ // break;
+ // case FLOAT:
+ // statistics = new FloatStatistics();
+ // break;
+ // case DOUBLE:
+ // statistics = new DoubleStatistics();
+ // break;
+ // default:
+ // break;
+ // }
+ // statisticsMap.put(mergeReaderPriority, statistics);
+ // // prepare mergeReader
+ // if (chunkSuit4CPV.getBatchData() == null) {
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter);
+ // List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a
+ // chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // tmpCurrentChunkList,
+ // null,
+ // chunkMetadata);
+ // }
+ // currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
+ // chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ //
+ // // System.out.println(
+ // // "====DEBUG====: load chunk for update merge. Version="
+ // // + chunkMetadata.getVersion()
+ // // + " "
+ // // + chunkMetadata.getOffsetOfChunkHeader());
+ // }
+ // mergeReader.addReader(
+ // chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+ // new MergeReaderPriority(chunkSuit4CPV.getVersion(),
+ // chunkSuit4CPV.getOffset()));
+ // }
+ // while (mergeReader.hasNextTimeValuePair()) {
+ // Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
+ // TimeValuePair ret = res.left;
+ // // System.out.println(
+ // // "====DEBUG====: merge for bottomPoint. (t,v)="
+ // // + ret.getTimestamp()
+ // // + ","
+ // // + ret.getValue().getValue());
+ // updateBatchDataMap
+ // .get(res.right)
+ // .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
+ // switch (tsDataType) {
+ // case INT32:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (int) ret.getValue().getValue());
+ // break;
+ // case INT64:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (long) ret.getValue().getValue());
+ // break;
+ // case FLOAT:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (float) ret.getValue().getValue());
+ // break;
+ // case DOUBLE:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (double) ret.getValue().getValue());
+ // break;
+ // default:
+ // throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
+ // }
+ // }
+ // mergeReader.close();
+ //
+ // for (int o = 0; o < toMerge.size(); o++) {
+ // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // // to MARK these chunks are "merged" - not overlapped any more
+ // chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+ // chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+ // // update BatchData
+ // MergeReaderPriority mergeReaderPriority =
+ // new MergeReaderPriority(chunkSuit4CPV.getVersion(),
+ // chunkSuit4CPV.getOffset());
+ // chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+ // chunkSuit4CPV
+ // .getChunkMetadata()
+ // .setStatistics(statisticsMap.get(mergeReaderPriority));
+ // }
+ // // System.out.println(
+ // // "====DEBUG====: merged chunks are : version="
+ // // + newMergedVersionList
+ // // + " offsets="
+ // // + newMergedOffsetList);
+ // continue;
+ // }
+ // }
+ // }
+ //
+ // if (!isFinal[3]) { // topPoint
+ // long topTimestamp = timestamps[3];
+ // ChunkMetadata topChunkMetadata = currentChunkList.get(listIdx[3]).getChunkMetadata();
+ // List<Long> mergedVersionList = currentChunkList.get(listIdx[3]).getMergeVersionList();
+ // List<Long> mergedOffsetList = currentChunkList.get(listIdx[3]).getMergeOffsetList();
+ // // check if the point is deleted:
+ // List<TimeRange> topDeleteIntervalList = topChunkMetadata.getDeleteIntervalList();
+ // boolean isDeletedItself = false;
+ // if (topDeleteIntervalList != null) {
+ // for (TimeRange timeRange : topDeleteIntervalList) {
+ // if (timeRange.contains(topTimestamp)) {
+ // isDeletedItself = true;
+ // break;
+ // }
+ // }
+ // }
+ // if (isDeletedItself) {
+ // // System.out.println(
+ // // "====DEBUG====: load the chunk because candidate topPoint is actually
+ // // deleted. Version="
+ // // + topChunkMetadata.getVersion()
+ // // + " "
+ // // + topChunkMetadata.getOffsetOfChunkHeader());
+ //
+ // currentChunkList.remove(listIdx[3]);
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(topChunkMetadata, this.timeFilter);
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // currentChunkList,
+ // null,
+ // topChunkMetadata);
+ // }
+ // continue; // next iteration to check currentChunkList
+ // } else { // verify if it is overlapped by other chunks with larger version number and
+ // not in
+ // // the deleted time interval
+ // List<Integer> toMerge = new ArrayList<>();
+ // for (int i = 0; i < currentChunkList.size(); i++) {
+ // ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata();
+ // MergeReaderPriority version =
+ // new MergeReaderPriority(
+ // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ // if (version.compareTo(versions[3]) <= 0) { // including topChunkMetadata
+ // continue;
+ // }
+ // if (topTimestamp < chunkMetadata.getStartTime()
+ // || topTimestamp > chunkMetadata.getEndTime()) {
+ // continue;
+ // }
+ // boolean isMerged = false;
+ // for (int k = 0; k < mergedVersionList.size(); k++) {
+ // if (mergedVersionList.get(k) == chunkMetadata.getVersion()
+ // && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) {
+ // isMerged = true;
+ // break;
+ // }
+ // }
+ // if (isMerged) {
+ // continue;
+ // }
+ // toMerge.add(i);
+ // }
+ // if (toMerge.isEmpty()) {
+ // results
+ // .get(5)
+ // .updateResultUsingValues(
+ // Arrays.copyOfRange(timestamps, 3, 4),
+ // 1,
+ // Arrays.copyOfRange(values, 3, 4)); // max_value
+ // isFinal[3] = true;
+ // // System.out.println("====DEBUG====: find topPoint");
+ // return results;
+ // } else {
+ // // deal with toMerge chunks: delete updated points
+ // toMerge.add(listIdx[3]);
+ // List<Long> newMergedVersionList = new ArrayList<>();
+ // List<Long> newMergedOffsetList = new ArrayList<>();
+ // for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any
+ // more
+ // ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata();
+ // newMergedVersionList.add(tmpChunkMetadata.getVersion());
+ // newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+ // }
+ // Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>();
+ // Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>();
+ // for (int o = 0; o < toMerge.size(); o++) {
+ // // create empty batchData
+ // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ // MergeReaderPriority mergeReaderPriority =
+ // new MergeReaderPriority(
+ // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader());
+ // BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false);
+ // updateBatchDataMap.put(mergeReaderPriority, batch1);
+ // // create empty statistics
+ // Statistics statistics = null;
+ // switch (tsDataType) {
+ // case INT32:
+ // statistics = new IntegerStatistics();
+ // break;
+ // case INT64:
+ // statistics = new LongStatistics();
+ // break;
+ // case FLOAT:
+ // statistics = new FloatStatistics();
+ // break;
+ // case DOUBLE:
+ // statistics = new DoubleStatistics();
+ // break;
+ // default:
+ // break;
+ // }
+ // statisticsMap.put(mergeReaderPriority, statistics);
+ // // prepare mergeReader
+ // if (chunkSuit4CPV.getBatchData() == null) {
+ // List<IPageReader> pageReaderList =
+ // FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter);
+ // List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ // for (IPageReader pageReader : pageReaderList) { // assume only one page in a
+ // chunk
+ // ((PageReader) pageReader)
+ // .split4CPV(
+ // startTime,
+ // endTime,
+ // interval,
+ // curStartTime,
+ // tmpCurrentChunkList,
+ // null,
+ // chunkMetadata);
+ // }
+ // currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0));
+ // chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ //
+ // // System.out.println(
+ // // "====DEBUG====: load chunk for update merge. Version="
+ // // + chunkMetadata.getVersion()
+ // // + " "
+ // // + chunkMetadata.getOffsetOfChunkHeader());
+ // }
+ // mergeReader.addReader(
+ // chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+ // new MergeReaderPriority(chunkSuit4CPV.getVersion(),
+ // chunkSuit4CPV.getOffset()));
+ // }
+ // while (mergeReader.hasNextTimeValuePair()) {
+ // Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement();
+ // TimeValuePair ret = res.left;
+ // // System.out.println(
+ // // "====DEBUG====: merge for topPoint. (t,v)="
+ // // + ret.getTimestamp()
+ // // + ","
+ // // + ret.getValue().getValue());
+ // updateBatchDataMap
+ // .get(res.right)
+ // .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
+ // switch (tsDataType) {
+ // case INT32:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (int) ret.getValue().getValue());
+ // break;
+ // case INT64:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (long) ret.getValue().getValue());
+ // break;
+ // case FLOAT:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (float) ret.getValue().getValue());
+ // break;
+ // case DOUBLE:
+ // statisticsMap
+ // .get(res.right)
+ // .update(ret.getTimestamp(), (double) ret.getValue().getValue());
+ // break;
+ // default:
+ // throw new UnSupportedDataTypeException(String.valueOf(tsDataType));
+ // }
+ // }
+ // mergeReader.close();
+ //
+ // for (int o = 0; o < toMerge.size(); o++) {
+ // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+ // // to MARK these chunks are "merged" - not overlapped any more
+ // chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+ // chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+ // // update BatchData
+ // MergeReaderPriority mergeReaderPriority =
+ // new MergeReaderPriority(chunkSuit4CPV.getVersion(),
+ // chunkSuit4CPV.getOffset());
+ // chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+ // chunkSuit4CPV
+ // .getChunkMetadata()
+ // .setStatistics(statisticsMap.get(mergeReaderPriority));
+ // }
+ // continue;
+ // }
+ // }
+ // }
+ // } while (true);
+ // }
@Override
public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 2e4541c28c..a0c1fef97e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -18,12 +18,6 @@
*/
package org.apache.iotdb.tsfile.read.reader.page;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -44,37 +38,34 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class PageReader implements IPageReader {
private PageHeader pageHeader;
protected TSDataType dataType;
- /**
- * decoder for value column
- */
+ /** decoder for value column */
protected Decoder valueDecoder;
- /**
- * decoder for time column
- */
+ /** decoder for time column */
protected Decoder timeDecoder;
- /**
- * time column in memory
- */
+ /** time column in memory */
protected ByteBuffer timeBuffer;
- /**
- * value column in memory
- */
+ /** value column in memory */
protected ByteBuffer valueBuffer;
protected Filter filter;
- /**
- * A list of deleted intervals.
- */
+ /** A list of deleted intervals. */
private List<TimeRange> deleteIntervalList;
private int deleteCursor = 0;
@@ -247,12 +238,13 @@ public class PageReader implements IPageReader {
new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
} else {
splitChunkList.computeIfAbsent(i, k -> new ArrayList<>());
- splitChunkList.get(i).add(
- new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
+ splitChunkList
+ .get(i)
+ .add(
+ new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip()));
}
}
}
-
}
/**
@@ -273,9 +265,7 @@ public class PageReader implements IPageReader {
return false;
}
- /**
- * @return the returned BatchData may be empty, but never be null
- */
+ /** @return the returned BatchData may be empty, but never be null */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {