You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/28 01:06:57 UTC
[iotdb] branch master updated: [IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 56c0d7c [IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)
56c0d7c is described below
commit 56c0d7c7b33a5718a0b72bd47477e4ee10aac744
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Jan 28 09:05:54 2022 +0800
[IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)
---
.../iotdb/db/query/reader/series/SeriesReader.java | 84 +++++++++++-----------
1 file changed, 40 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index e9d7bc8..02058aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -500,7 +500,7 @@ public class SeriesReader {
/*
* first chunk metadata is already unpacked, consume cached pages
*/
- unpackFirstPageReader();
+ initFirstPageReader();
}
if (isExistOverlappedPage()) {
@@ -511,7 +511,7 @@ public class SeriesReader {
// readers
while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
- unpackFirstPageReader();
+ initFirstPageReader();
if (isExistOverlappedPage()) {
return true;
@@ -520,16 +520,6 @@ public class SeriesReader {
return firstPageReader != null;
}
- private void unpackFirstPageReader() throws IOException {
- initFirstPageReader();
- if (firstPageReader != null) {
- long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
- }
- }
-
private boolean isExistOverlappedPage() throws IOException {
if (firstPageOverlapped()) {
/*
@@ -756,17 +746,6 @@ public class SeriesReader {
cachedBatchData =
BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true);
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
- // get data as small as possible
- if (firstPageReader != null) {
- currentPageEndPointTime =
- orderUtils.getCurrentEndPoint(
- currentPageEndPointTime, firstPageReader.getStatistics());
- }
- if (!seqPageReaders.isEmpty()) {
- currentPageEndPointTime =
- orderUtils.getCurrentEndPoint(
- currentPageEndPointTime, seqPageReaders.get(0).getStatistics());
- }
while (mergeReader.hasNextTimeValuePair()) {
/*
@@ -915,17 +894,6 @@ public class SeriesReader {
return;
}
- long currentPageEndpointTime;
-
- if (!unSeqPageReaders.isEmpty()) {
- currentPageEndpointTime =
- orderUtils.getOverlapCheckTime(unSeqPageReaders.peek().getStatistics());
- // unpack all overlapped data for first unseq page
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(currentPageEndpointTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(currentPageEndpointTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(currentPageEndpointTime, false);
- }
-
/*
* init firstPageReader
*/
@@ -933,36 +901,64 @@ public class SeriesReader {
initFirstPageReader();
}
+ long currentPageEndpointTime;
if (mergeReader.hasNextTimeValuePair()) {
currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
} else {
currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
}
- // unpack all overlapped data for currentPageEndpointTime
- unpackAllOverlappedTsFilesToTimeSeriesMetadata(currentPageEndpointTime);
- unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(currentPageEndpointTime, false);
- unpackAllOverlappedChunkMetadataToPageReaders(currentPageEndpointTime, false);
-
/*
* put all currently directly overlapped unseq page reader to merge reader
*/
unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
}
- private void initFirstPageReader() {
+ private void initFirstPageReader() throws IOException {
+ while (this.firstPageReader == null) {
+ VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders();
+
+ // unpack overlapped page using current page reader
+ if (firstPageReader != null) {
+ long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+ unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
+ unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
+ unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+
+ // this page after unpacking must be the first page
+ if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
+ this.firstPageReader = firstPageReader;
+ if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) {
+ seqPageReaders.remove(0);
+ break;
+ } else if (!unSeqPageReaders.isEmpty()
+ && firstPageReader.equals(unSeqPageReaders.peek())) {
+ unSeqPageReaders.poll();
+ break;
+ }
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
+ // We use get() and peek() here in case it's not the first page reader before unpacking
+ private VersionPageReader getFirstPageReaderFromCachedReaders() {
+ VersionPageReader firstPageReader = null;
if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
if (orderUtils.isTakeSeqAsFirst(
seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) {
- firstPageReader = seqPageReaders.remove(0);
+ firstPageReader = seqPageReaders.get(0);
} else {
- firstPageReader = unSeqPageReaders.poll();
+ firstPageReader = unSeqPageReaders.peek();
}
} else if (!seqPageReaders.isEmpty()) {
- firstPageReader = seqPageReaders.remove(0);
+ firstPageReader = seqPageReaders.get(0);
} else if (!unSeqPageReaders.isEmpty()) {
- firstPageReader = unSeqPageReaders.poll();
+ firstPageReader = unSeqPageReaders.peek();
}
+ return firstPageReader;
}
private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)