You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/29 10:03:04 UTC
[incubator-iotdb] 01/01: reorganize series reader and aggregate
reader
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9aa61760a9303f42b5bb3f285ccaca92964e5f04
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Feb 29 18:02:39 2020 +0800
reorganize series reader and aggregate reader
---
.../SystemDesign/5-DataQuery/2-SeriesReader.md | 14 +-
.../reader/series/SeriesRawDataBatchReader.java | 49 ++-
.../reader/series/SeriesRawDataPointReader.java | 43 +--
.../iotdb/db/query/reader/series/SeriesReader.java | 340 ++++++++++++---------
.../file/metadata/statistics/BinaryStatistics.java | 2 +-
.../metadata/statistics/BooleanStatistics.java | 2 +-
.../file/metadata/statistics/DoubleStatistics.java | 2 +-
.../file/metadata/statistics/FloatStatistics.java | 2 +-
.../metadata/statistics/IntegerStatistics.java | 2 +-
.../file/metadata/statistics/LongStatistics.java | 2 +-
.../file/metadata/statistics/Statistics.java | 5 +
11 files changed, 256 insertions(+), 207 deletions(-)
diff --git a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
index 54b0f38..8bbc548 100644
--- a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
+++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
@@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {
当前第一个chunk meta data的引用
-* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
+* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));
@@ -272,27 +272,27 @@ while (aggregateReader.hasNextChunk()) {
这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。
-首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`。
+首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`。
#### isPageOverlapped()
这个方法判断当前的Page有没有其他与之重叠的Page存在。
-如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。
+如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。
#### nextPage()
须与`isPageOverlapped()`方法搭配使用。
-当`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
+当`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。
#### currentPageStatistics()
-返回`overlappedPageReaders`里第一个Page的统计信息。
+返回`cachedPageReaders`里第一个Page的统计信息。
#### skipCurrentPage()
-跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
+跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。
#### hasNextOverlappedPage()
@@ -300,7 +300,7 @@ while (aggregateReader.hasNextChunk()) {
如果`hasCachedNextBatch`为`true`,直接返回`true`。
-否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
+否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`。
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index 8e4c69f..9475b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.series;
+import java.util.LinkedList;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -42,6 +43,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
private BatchData batchData;
private boolean hasCachedBatchData = false;
+
public SeriesRawDataBatchReader(SeriesReader seriesReader) {
this.seriesReader = seriesReader;
}
@@ -71,29 +73,46 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
return true;
}
- while (seriesReader.hasNextChunk()) {
- while (seriesReader.hasNextPage()) {
+ /*
+ * consume overlapped data firstly
+ */
+ if (seriesReader.hasNextOverlappedPage()) {
+ batchData = seriesReader.nextOverlappedPage();
+ hasCachedBatchData = true;
+ return true;
+ }
+
+
+ /*
+ * consume pages secondly
+ */
+ if (seriesReader.hasNextPage()) {
+ if (!seriesReader.isPageOverlapped()) {
+ batchData = seriesReader.nextPage();
+ } else if (seriesReader.hasNextOverlappedPage()) {
+ batchData = seriesReader.nextOverlappedPage();
+ }
+ hasCachedBatchData = true;
+ return true;
+ }
+
+ /*
+ * consume next chunk finally
+ */
+ if (seriesReader.hasNextChunk()) {
+ if (seriesReader.hasNextPage()) {
if (!seriesReader.isPageOverlapped()) {
batchData = seriesReader.nextPage();
- if (!batchData.hasCurrent()) {
- continue;
- }
- hasCachedBatchData = true;
- return true;
- }
- if (seriesReader.hasNextOverlappedPage()) {
+ } else if (seriesReader.hasNextOverlappedPage()) {
batchData = seriesReader.nextOverlappedPage();
- if (!batchData.hasCurrent()) {
- continue;
- }
- hasCachedBatchData = true;
- return true;
}
+ hasCachedBatchData = true;
}
}
- return false;
+ return hasCachedBatchData;
}
+
@Override
public BatchData nextBatch() throws IOException {
if (hasCachedBatchData || hasNextBatch()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
index 4d9941c..4be9c84 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataPointReader.java
@@ -26,37 +26,16 @@ import java.io.IOException;
public class SeriesRawDataPointReader implements IPointReader {
- private final SeriesReader seriesReader;
+ private final SeriesRawDataBatchReader batchReader;
private boolean hasCachedTimeValuePair;
private BatchData batchData;
private TimeValuePair timeValuePair;
public SeriesRawDataPointReader(SeriesReader seriesReader) {
- this.seriesReader = seriesReader;
+ this.batchReader = new SeriesRawDataBatchReader(seriesReader);
}
- private boolean hasNext() throws IOException {
- while (seriesReader.hasNextChunk()) {
- while (seriesReader.hasNextPage()) {
- if (seriesReader.hasNextOverlappedPage()) {
- return true;
- }
- }
- }
- return false;
- }
-
- private boolean hasNextSatisfiedInCurrentBatch() {
- if (batchData != null && batchData.hasCurrent()) {
- timeValuePair = new TimeValuePair(batchData.currentTime(),
- batchData.currentTsPrimitiveType());
- hasCachedTimeValuePair = true;
- batchData.next();
- return true;
- }
- return false;
- }
@Override
public boolean hasNextTimeValuePair() throws IOException {
@@ -64,17 +43,25 @@ public class SeriesRawDataPointReader implements IPointReader {
return true;
}
- if (hasNextSatisfiedInCurrentBatch()) {
+ if (batchData != null) {
+ timeValuePair = new TimeValuePair(batchData.currentTime(),
+ batchData.currentTsPrimitiveType());
+ hasCachedTimeValuePair = true;
+ batchData.next();
return true;
}
- // has not cached timeValuePair
- while (hasNext()) {
- batchData = seriesReader.nextOverlappedPage();
- if (hasNextSatisfiedInCurrentBatch()) {
+ while (batchReader.hasNextBatch()) {
+ batchData = batchReader.nextBatch();
+ if (batchData.hasCurrent()) {
+ timeValuePair = new TimeValuePair(batchData.currentTime(),
+ batchData.currentTsPrimitiveType());
+ hasCachedTimeValuePair = true;
+ batchData.next();
return true;
}
}
+
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 8b1f795..a299837 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -58,22 +58,38 @@ public class SeriesReader {
private final Filter timeFilter;
private final Filter valueFilter;
+
+ /*
+ * file cache
+ */
private final List<TsFileResource> seqFileResource;
private final PriorityQueue<TsFileResource> unseqFileResource;
+
+ /*
+ * chunk cache
+ */
+ private ChunkMetaData firstChunkMetaData;
private final List<ChunkMetaData> seqChunkMetadatas = new LinkedList<>();
private final PriorityQueue<ChunkMetaData> unseqChunkMetadatas =
new PriorityQueue<>(Comparator.comparingLong(ChunkMetaData::getStartTime));
- private boolean hasCachedFirstChunkMetadata;
- private ChunkMetaData firstChunkMetaData;
-
- private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
+ /*
+ * page cache
+ */
+ private VersionPair<IPageReader> firstPageReader;
+ private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));
+ /*
+ * point cache
+ */
private PriorityMergeReader mergeReader = new PriorityMergeReader();
+ /*
+ * result cache
+ */
private boolean hasCachedNextBatch;
private BatchData cachedBatchData;
@@ -104,49 +120,24 @@ public class SeriesReader {
public boolean hasNextChunk() throws IOException {
- if (hasCachedFirstChunkMetadata) {
- return true;
+
+ if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader.hasNextTimeValuePair()) {
+ throw new IOException("all cached pages should be consumed first");
}
+
// init first chunk metadata whose startTime is minimum
- tryToInitFirstChunk();
+ tryToUnpackAllOverlappedFilesToChunkMetadatas();
- return hasCachedFirstChunkMetadata;
+ return firstChunkMetaData != null;
}
- /**
- * Because seq data and unseq data intersect, the minimum startTime taken from two files at a time
- * is used as the reference time to start reading data
- */
- private void tryToInitFirstChunk() throws IOException {
- tryToFillChunkMetadatas();
- hasCachedFirstChunkMetadata = true;
- if (!seqChunkMetadatas.isEmpty() && unseqChunkMetadatas.isEmpty()) {
- // only has seq
- firstChunkMetaData = seqChunkMetadatas.remove(0);
- } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) {
- // only has unseq
- firstChunkMetaData = unseqChunkMetadatas.poll();
- } else if (!seqChunkMetadatas.isEmpty()) {
- // has seq and unseq
- if (seqChunkMetadatas.get(0).getStartTime() <= unseqChunkMetadatas.peek().getStartTime()) {
- firstChunkMetaData = seqChunkMetadatas.remove(0);
- } else {
- firstChunkMetaData = unseqChunkMetadatas.poll();
- }
- } else {
- // no seq nor unseq
- hasCachedFirstChunkMetadata = false;
- }
- tryToFillChunkMetadatas();
- }
public boolean isChunkOverlapped() {
Statistics chunkStatistics = firstChunkMetaData.getStatistics();
- return mergeReader.hasNextTimeValuePair()
- || (!seqChunkMetadatas.isEmpty()
- && chunkStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime())
- || (!unseqChunkMetadatas.isEmpty()
- && chunkStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime());
+ return !seqChunkMetadatas.isEmpty()
+ && chunkStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime()
+ || !unseqChunkMetadatas.isEmpty()
+ && chunkStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime();
}
public Statistics currentChunkStatistics() {
@@ -154,32 +145,52 @@ public class SeriesReader {
}
public void skipCurrentChunk() {
- hasCachedFirstChunkMetadata = false;
firstChunkMetaData = null;
}
/**
- * This method should be called after hasNxtChunk
- * @return
- * @throws IOException
+ * This method should be called after hasNextChunk()
+ * make sure that all overlapped pages are consumed before
*/
public boolean hasNextPage() throws IOException {
- if (!overlappedPageReaders.isEmpty()) {
- return true;
+ if (mergeReader.hasNextTimeValuePair()) {
+ throw new IOException("all overlapped pages should be consumed first");
}
- fillOverlappedPageReaders();
+ if (firstChunkMetaData != null) {
+ /*
+ * try to unpack all overlapped ChunkMetadata to cachedPageReaders
+ */
+ unpackAllOverlappedChunkMetadataToCachedPageReaders(firstChunkMetaData.getEndTime());
+ } else {
+ /*
+ * first chunk metadata is already unpacked
+ */
+ if (firstPageReader == null && !cachedPageReaders.isEmpty()) {
+ firstPageReader = cachedPageReaders.poll();
+ }
+ }
- return !overlappedPageReaders.isEmpty();
+ return firstPageReader != null;
}
- private void fillOverlappedPageReaders() throws IOException {
- if (!hasCachedFirstChunkMetadata) {
- return;
+
+ private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime) throws IOException {
+ while (!seqChunkMetadatas.isEmpty() && endTime >= seqChunkMetadatas.get(0).getStartTime()) {
+ unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
+ }
+ while (!unseqChunkMetadatas.isEmpty() && endTime >= unseqChunkMetadatas.peek().getStartTime()) {
+ unpackOneChunkMetaData(unseqChunkMetadatas.poll());
+ }
+
+ if (firstChunkMetaData != null && endTime >= firstChunkMetaData.getStartTime()) {
+ unpackOneChunkMetaData(firstChunkMetaData);
+ firstChunkMetaData = null;
+ }
+
+ if (firstPageReader == null && !cachedPageReaders.isEmpty()) {
+ firstPageReader = cachedPageReaders.poll();
}
- unpackOneChunkMetaData(firstChunkMetaData);
- hasCachedFirstChunkMetadata = false;
- firstChunkMetaData = null;
}
private void unpackOneChunkMetaData(ChunkMetaData chunkMetaData) throws IOException {
@@ -187,26 +198,46 @@ public class SeriesReader {
.getPageReaderList()
.forEach(
pageReader ->
- overlappedPageReaders.add(
+ cachedPageReaders.add(
new VersionPair(chunkMetaData.getVersion(), pageReader)));
}
+ /**
+ * This method should be called after calling hasNextPage.
+ */
+ protected boolean isPageOverlapped() throws IOException {
+ if (firstPageReader == null) {
+ throw new IOException("no next page, make sure hasNextPage() is true");
+ }
+
+ Statistics firstPageStatistics = firstPageReader.data.getStatistics();
+
+ return !cachedPageReaders.isEmpty() &&
+ firstPageStatistics.getEndTime() >= cachedPageReaders.peek().data.getStatistics()
+ .getStartTime();
+ }
/**
* This method should only be used when the method isPageOverlapped() return true.
- * @return
- * @throws IOException
*/
protected BatchData nextPage() throws IOException {
- if (overlappedPageReaders.isEmpty()) {
- throw new IOException("overlappedPageReaders is empty, hasNextPage and isPageOverlapped methods should be called first");
+ if (isPageOverlapped()) {
+ throw new IOException("next page is overlapped, make sure isPageOverlapped is false");
}
- BatchData pageData = overlappedPageReaders.poll().data.getAllSatisfiedPageData();
- // only need to consider valueFilter because timeFilter has been set into the page reader
+ BatchData pageData = firstPageReader.data.getAllSatisfiedPageData();
+
+ /*
+ * no value filter
+ * only need to consider valueFilter because timeFilter has been set into the page reader
+ */
if (valueFilter == null) {
return pageData;
}
+
+ /*
+ * has value filter
+ */
BatchData batchData = new BatchData(pageData.getDataType());
while (pageData.hasCurrent()) {
if (valueFilter.satisfy(pageData.currentTime(), pageData.currentValue())) {
@@ -217,39 +248,19 @@ public class SeriesReader {
return batchData;
}
- /**
- * This method should be called after calling hasNextPage.
- * @return
- * @throws IOException
- */
- protected boolean isPageOverlapped() throws IOException {
- if (overlappedPageReaders.isEmpty()) {
- throw new IOException("overlappedPageReaders is empty, hasNextPage method should be called first");
- }
-
- Statistics pageStatistics = overlappedPageReaders.peek().data.getStatistics();
- return mergeReader.hasNextTimeValuePair()
- || (!seqChunkMetadatas.isEmpty()
- && pageStatistics.getEndTime() >= seqChunkMetadatas.get(0).getStartTime())
- || (!unseqChunkMetadatas.isEmpty()
- && pageStatistics.getEndTime() >= unseqChunkMetadatas.peek().getStartTime());
- }
-
public Statistics currentPageStatistics() throws IOException {
- if (overlappedPageReaders.isEmpty() || overlappedPageReaders.peek().data == null) {
- throw new IOException("No next page statistics.");
+ if (firstPageReader == null) {
+ throw new IOException("No next page.");
}
- return overlappedPageReaders.peek().data.getStatistics();
+ return firstPageReader.data.getStatistics();
}
public void skipCurrentPage() {
- overlappedPageReaders.poll();
+ firstPageReader = null;
}
/**
* This method should be called after hasNextChunk and hasNextPage methods.
- * @return
- * @throws IOException
*/
public boolean hasNextOverlappedPage() throws IOException {
@@ -257,93 +268,80 @@ public class SeriesReader {
return true;
}
- putAllDirectlyOverlappedPageReadersIntoMergeReader();
+ tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader();
if (mergeReader.hasNextTimeValuePair()) {
cachedBatchData = new BatchData(dataType);
long currentPageEndTime = mergeReader.getCurrentLargestEndTime();
+
while (mergeReader.hasNextTimeValuePair()) {
+
TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
+
if (timeValuePair.getTimestamp() > currentPageEndTime) {
break;
}
- // unpack all overlapped chunks
- while (true) {
- tryToFillChunkMetadatas();
- boolean hasOverlappedChunkMetadata = false;
- if (!seqChunkMetadatas.isEmpty()
- && timeValuePair.getTimestamp() >= seqChunkMetadatas.get(0).getStartTime()) {
- unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
- hasOverlappedChunkMetadata = true;
- }
- if (!unseqChunkMetadatas.isEmpty()
- && timeValuePair.getTimestamp() >= unseqChunkMetadatas.peek().getStartTime()) {
- unpackOneChunkMetaData(unseqChunkMetadatas.poll());
- hasOverlappedChunkMetadata = true;
- }
- if (!hasOverlappedChunkMetadata) {
- break;
- }
- }
- // put all overlapped pages into merge reader
- while (!overlappedPageReaders.isEmpty()
- && timeValuePair.getTimestamp()
- >= overlappedPageReaders.peek().data.getStatistics().getStartTime()) {
- VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
- mergeReader.addReader(
- pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
- pageReader.data.getStatistics().getEndTime());
- }
+ unpackAllOverlappedTsFilesToChunkMetadatas(timeValuePair.getTimestamp());
+ unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp());
+ unpackAllOverlappedCachedPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+ cachedBatchData.putAnObject(
+ timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+
+ mergeReader.nextTimeValuePair();
- timeValuePair = mergeReader.nextTimeValuePair();
- if (valueFilter == null || valueFilter
- .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
- cachedBatchData.putAnObject(
- timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
- }
}
hasCachedNextBatch = cachedBatchData.hasCurrent();
}
return hasCachedNextBatch;
}
- private void putAllDirectlyOverlappedPageReadersIntoMergeReader() throws IOException {
+ private void tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader() throws IOException {
+
+ /*
+ * no cached page readers
+ */
+ if (firstPageReader == null && cachedPageReaders.isEmpty()) {
+ return;
+ }
+
+ /*
+ * init firstPageReader
+ */
+ if (firstPageReader == null) {
+ firstPageReader = cachedPageReaders.poll();
+ }
+
long currentPageEndTime;
if (mergeReader.hasNextTimeValuePair()) {
currentPageEndTime = mergeReader.getCurrentLargestEndTime();
- } else if (!overlappedPageReaders.isEmpty()) {
- // put the first page into merge reader
- currentPageEndTime = overlappedPageReaders.peek().data.getStatistics().getEndTime();
- VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
- mergeReader.addReader(
- pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
- pageReader.data.getStatistics().getEndTime());
} else {
- return;
+ // put the first page into merge reader
+ currentPageEndTime = firstPageReader.data.getStatistics().getEndTime();
}
- // unpack all overlapped seq chunk meta data into overlapped page readers
- while (!seqChunkMetadatas.isEmpty()
- && currentPageEndTime >= seqChunkMetadatas.get(0).getStartTime()) {
- unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
- tryToFillChunkMetadatas();
+ /*
+ * put all currently directly overlapped page reader to merge reader
+ */
+ unpackAllOverlappedCachedPageReadersToMergeReader(currentPageEndTime);
+ }
+
+ private void unpackAllOverlappedCachedPageReadersToMergeReader(long endTime) throws IOException {
+ while (!cachedPageReaders.isEmpty() && endTime >= cachedPageReaders.peek().data
+ .getStatistics().getStartTime()) {
+ putPageReaderToMergeReader(cachedPageReaders.poll());
}
- // unpack all overlapped unseq chunk meta data into overlapped page readers
- while (!unseqChunkMetadatas.isEmpty()
- && currentPageEndTime >= unseqChunkMetadatas.peek().getStartTime()) {
- unpackOneChunkMetaData(unseqChunkMetadatas.poll());
- tryToFillChunkMetadatas();
+ if (firstPageReader != null && endTime >= firstPageReader.data.getStatistics().getStartTime()) {
+ putPageReaderToMergeReader(firstPageReader);
+ firstPageReader = null;
}
+ }
- // put all page that directly overlapped with first page into merge reader
- while (!overlappedPageReaders.isEmpty()
- && currentPageEndTime >= overlappedPageReaders.peek().data.getStatistics().getStartTime()) {
- VersionPair<IPageReader> pageReader = overlappedPageReaders.poll();
- mergeReader.addReader(
- pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(), pageReader.version,
- pageReader.data.getStatistics().getEndTime());
- }
+ private void putPageReaderToMergeReader(VersionPair<IPageReader> pageReader) throws IOException {
+ mergeReader.addReader(
+ pageReader.data.getAllSatisfiedPageData().getBatchDataIterator(),
+ pageReader.version, pageReader.data.getStatistics().getEndTime());
}
public BatchData nextOverlappedPage() throws IOException {
@@ -426,26 +424,66 @@ public class SeriesReader {
return unseqTsFilesSet;
}
+
/**
+ *
+ * unpack all overlapped seq/unseq files and find the first chunk metadata
+ *
* Because there may be too many files in the scenario used by the user, we cannot open all the
* chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
* approach is likely to be ubiquitous, but it keeps the system running smoothly
*/
- private void tryToFillChunkMetadatas() throws IOException {
- // Fill sequence chunkMetadatas until it is not empty
+ private void tryToUnpackAllOverlappedFilesToChunkMetadatas() throws IOException {
+
+ /*
+ * Fill sequence chunkMetadatas until it is not empty
+ */
while (seqChunkMetadatas.isEmpty() && !seqFileResource.isEmpty()) {
seqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(seqFileResource.remove(0)));
}
- // Fill unsequence chunkMetadatas until there are no overlapped unseqFileResources
+ /*
+ * Fill unsequence chunkMetadatas until it is not empty
+ */
while (unseqChunkMetadatas.isEmpty() && !unseqFileResource.isEmpty()) {
unseqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(unseqFileResource.poll()));
}
- while (!unseqChunkMetadatas.isEmpty() && !unseqFileResource.isEmpty()
- && unseqChunkMetadatas.peek().getEndTime() >=
- unseqFileResource.peek().getStartTimeMap().get(seriesPath.getDevice())) {
+
+ /*
+ * find first chunk metadata
+ */
+ if (!seqChunkMetadatas.isEmpty() && unseqChunkMetadatas.isEmpty()) {
+ // only has seq
+ firstChunkMetaData = seqChunkMetadatas.remove(0);
+ } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) {
+ // only has unseq
+ firstChunkMetaData = unseqChunkMetadatas.poll();
+ } else if (!seqChunkMetadatas.isEmpty()) {
+ // has seq and unseq
+ if (seqChunkMetadatas.get(0).getStartTime() <= unseqChunkMetadatas.peek().getStartTime()) {
+ firstChunkMetaData = seqChunkMetadatas.remove(0);
+ } else {
+ firstChunkMetaData = unseqChunkMetadatas.poll();
+ }
+ }
+
+ /*
+ * unpack all directly overlapped seq/unseq files with first chunk metadata
+ */
+ if (firstChunkMetaData != null) {
+ unpackAllOverlappedTsFilesToChunkMetadatas(firstChunkMetaData.getEndTime());
+ }
+ }
+
+ private void unpackAllOverlappedTsFilesToChunkMetadatas(long endTime) throws IOException {
+ while (!unseqFileResource.isEmpty() && endTime >=
+ unseqFileResource.peek().getStartTimeMap().get(seriesPath.getDevice())) {
unseqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(unseqFileResource.poll()));
}
+ while (!seqFileResource.isEmpty() && endTime >=
+ seqFileResource.get(0).getStartTimeMap().get(seriesPath.getDevice())) {
+ seqChunkMetadatas.addAll(loadSatisfiedChunkMetadatas(seqFileResource.remove(0)));
+ }
}
public void setTimeFilter(long timestamp) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
index 3c0a3e4..64f1642 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java
@@ -190,7 +190,7 @@ public class BinaryStatistics extends Statistics<Binary> {
@Override
public String toString() {
- return "[fistValue:" + firstValue + ",lastValue:" + lastValue + "]";
+ return super.toString() + " [fistValue:" + firstValue + ",lastValue:" + lastValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
index 96d6c63..fa879c6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java
@@ -186,6 +186,6 @@ public class BooleanStatistics extends Statistics<Boolean> {
@Override
public String toString() {
- return "[firstValue:" + firstValue + ",lastValue:" + lastValue + "]";
+ return super.toString() + " [firstValue:" + firstValue + ",lastValue:" + lastValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
index 85ae01e..965419f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -215,7 +215,7 @@ public class DoubleStatistics extends Statistics<Double> {
@Override
public String toString() {
- return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+ return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
index 4daaf77..f7ad1f8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java
@@ -209,7 +209,7 @@ public class FloatStatistics extends Statistics<Float> {
@Override
public String toString() {
- return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+ return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
index 3bf7a5d..f11d9da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java
@@ -211,7 +211,7 @@ public class IntegerStatistics extends Statistics<Integer> {
@Override
public String toString() {
- return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+ return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
index 5741715..c270676 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java
@@ -217,7 +217,7 @@ public class LongStatistics extends Statistics<Long> {
@Override
public String toString() {
- return "[minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
+ return super.toString() + " [minValue:" + minValue + ",maxValue:" + maxValue + ",firstValue:" + firstValue +
",lastValue:" + lastValue + ",sumValue:" + sumValue + "]";
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
index 4f4e6e8..244530b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -414,6 +414,11 @@ public abstract class Statistics<T> {
}
@Override
+ public String toString() {
+ return "startTime: " + startTime + " endTime: " + endTime;
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;