You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/28 01:06:57 UTC

[iotdb] branch master updated: [IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 56c0d7c  [IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)
56c0d7c is described below

commit 56c0d7c7b33a5718a0b72bd47477e4ee10aac744
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Jan 28 09:05:54 2022 +0800

    [IOTDB-2445] [IOTDB-2502] Fix overlapped data should be consumed firstly bug (#4990)
---
 .../iotdb/db/query/reader/series/SeriesReader.java | 84 +++++++++++-----------
 1 file changed, 40 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index e9d7bc8..02058aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -500,7 +500,7 @@ public class SeriesReader {
       /*
        * first chunk metadata is already unpacked, consume cached pages
        */
-      unpackFirstPageReader();
+      initFirstPageReader();
     }
 
     if (isExistOverlappedPage()) {
@@ -511,7 +511,7 @@ public class SeriesReader {
     // readers
     while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
 
-      unpackFirstPageReader();
+      initFirstPageReader();
 
       if (isExistOverlappedPage()) {
         return true;
@@ -520,16 +520,6 @@ public class SeriesReader {
     return firstPageReader != null;
   }
 
-  private void unpackFirstPageReader() throws IOException {
-    initFirstPageReader();
-    if (firstPageReader != null) {
-      long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
-      unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
-      unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
-      unpackAllOverlappedChunkMetadataToPageReaders(endpointTime, false);
-    }
-  }
-
   private boolean isExistOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       /*
@@ -756,17 +746,6 @@ public class SeriesReader {
         cachedBatchData =
             BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true);
         long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-        // get data as small as possible
-        if (firstPageReader != null) {
-          currentPageEndPointTime =
-              orderUtils.getCurrentEndPoint(
-                  currentPageEndPointTime, firstPageReader.getStatistics());
-        }
-        if (!seqPageReaders.isEmpty()) {
-          currentPageEndPointTime =
-              orderUtils.getCurrentEndPoint(
-                  currentPageEndPointTime, seqPageReaders.get(0).getStatistics());
-        }
         while (mergeReader.hasNextTimeValuePair()) {
 
           /*
@@ -915,17 +894,6 @@ public class SeriesReader {
       return;
     }
 
-    long currentPageEndpointTime;
-
-    if (!unSeqPageReaders.isEmpty()) {
-      currentPageEndpointTime =
-          orderUtils.getOverlapCheckTime(unSeqPageReaders.peek().getStatistics());
-      // unpack all overlapped data for first unseq page
-      unpackAllOverlappedTsFilesToTimeSeriesMetadata(currentPageEndpointTime);
-      unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(currentPageEndpointTime, false);
-      unpackAllOverlappedChunkMetadataToPageReaders(currentPageEndpointTime, false);
-    }
-
     /*
      * init firstPageReader
      */
@@ -933,36 +901,64 @@ public class SeriesReader {
       initFirstPageReader();
     }
 
+    long currentPageEndpointTime;
     if (mergeReader.hasNextTimeValuePair()) {
       currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
     } else {
       currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
     }
 
-    // unpack all overlapped data for currentPageEndpointTime
-    unpackAllOverlappedTsFilesToTimeSeriesMetadata(currentPageEndpointTime);
-    unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(currentPageEndpointTime, false);
-    unpackAllOverlappedChunkMetadataToPageReaders(currentPageEndpointTime, false);
-
     /*
      * put all currently directly overlapped unseq page reader to merge reader
      */
     unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
   }
 
-  private void initFirstPageReader() {
+  private void initFirstPageReader() throws IOException {
+    while (this.firstPageReader == null) {
+      VersionPageReader firstPageReader = getFirstPageReaderFromCachedReaders();
+
+      // unpack overlapped page using current page reader
+      if (firstPageReader != null) {
+        long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
+        unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime);
+        unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false);
+        unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false);
+
+        // this page after unpacking must be the first page
+        if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) {
+          this.firstPageReader = firstPageReader;
+          if (!seqPageReaders.isEmpty() && firstPageReader.equals(seqPageReaders.get(0))) {
+            seqPageReaders.remove(0);
+            break;
+          } else if (!unSeqPageReaders.isEmpty()
+              && firstPageReader.equals(unSeqPageReaders.peek())) {
+            unSeqPageReaders.poll();
+            break;
+          }
+        }
+      } else {
+        return;
+      }
+    }
+  }
+
+  // We use get() and peek() here in case it's not the first page reader before unpacking
+  private VersionPageReader getFirstPageReaderFromCachedReaders() {
+    VersionPageReader firstPageReader = null;
     if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
       if (orderUtils.isTakeSeqAsFirst(
           seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek().getStatistics())) {
-        firstPageReader = seqPageReaders.remove(0);
+        firstPageReader = seqPageReaders.get(0);
       } else {
-        firstPageReader = unSeqPageReaders.poll();
+        firstPageReader = unSeqPageReaders.peek();
       }
     } else if (!seqPageReaders.isEmpty()) {
-      firstPageReader = seqPageReaders.remove(0);
+      firstPageReader = seqPageReaders.get(0);
     } else if (!unSeqPageReaders.isEmpty()) {
-      firstPageReader = unSeqPageReaders.poll();
+      firstPageReader = unSeqPageReaders.peek();
     }
+    return firstPageReader;
   }
 
   private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)