You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/01 10:06:52 UTC

[incubator-iotdb] branch optimize_series_reader updated: combine nextPage (#861)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch optimize_series_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/optimize_series_reader by this push:
     new 1dadf07  combine nextPage (#861)
1dadf07 is described below

commit 1dadf07efb7f81a048879ee0066f671a7a89bf74
Author: Dawei Liu <at...@163.com>
AuthorDate: Sun Mar 1 18:06:42 2020 +0800

    combine nextPage (#861)
    
    * fix valueFilterBug for nextOverlappedPage
---
 .../reader/series/SeriesRawDataBatchReader.java    | 64 ++++++++++++---------
 .../iotdb/db/query/reader/series/SeriesReader.java | 23 +++++---
 .../reader/series/SeriesReaderByTimestamp.java     | 65 +++++++++++-----------
 3 files changed, 85 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
index d1e6ea5..553fdf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java
@@ -69,41 +69,24 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
     if (hasCachedBatchData) {
       return true;
     }
-
     /*
      * consume overlapped data firstly
      */
-    if (seriesReader.hasNextOverlappedPage()) {
-      batchData = seriesReader.nextOverlappedPage();
-      hasCachedBatchData = true;
-      return true;
+    if (readOverlappedPage()) {
+      return hasCachedBatchData = true;
     }
-
-
     /*
-     * consume pages secondly
+     * consume page data secondly
      */
-    if (seriesReader.hasNextPage()) {
-      if (!seriesReader.isPageOverlapped()) {
-        batchData = seriesReader.nextPage();
-      } else if (seriesReader.hasNextOverlappedPage()) {
-        batchData = seriesReader.nextOverlappedPage();
-      }
-      hasCachedBatchData = true;
-      return true;
+    if (readPageData()) {
+      return hasCachedBatchData = true;
     }
-
     /*
      * consume next chunk finally
      */
-    if (seriesReader.hasNextChunk()) {
-      if (seriesReader.hasNextPage()) {
-        if (!seriesReader.isPageOverlapped()) {
-          batchData = seriesReader.nextPage();
-        } else if (seriesReader.hasNextOverlappedPage()) {
-          batchData = seriesReader.nextOverlappedPage();
-        }
-        hasCachedBatchData = true;
+    while (seriesReader.hasNextChunk()) {
+      if (readPageData()) {
+        return hasCachedBatchData = true;
       }
     }
     return hasCachedBatchData;
@@ -124,7 +107,6 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
     //no resources need to close
   }
 
-
   @Override
   public boolean isManagedByQueryManager() {
     return managedByQueryManager;
@@ -145,4 +127,34 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader {
     this.hasRemaining = hasRemaining;
   }
 
+
+  private boolean readPageData() throws IOException {
+    while (seriesReader.hasNextPage()) {
+      if (!seriesReader.isPageOverlapped()) {
+        batchData = seriesReader.nextPage();
+        if (!isEmpty(batchData)) {
+          return true;
+        }
+        continue;
+      }
+      if (readOverlappedPage()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readOverlappedPage() throws IOException {
+    while (seriesReader.hasNextOverlappedPage()) {
+      batchData = seriesReader.nextOverlappedPage();
+      if (!isEmpty(batchData)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isEmpty(BatchData batchData) {
+    return batchData == null || !batchData.hasCurrent();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 7ef65e4..7b5fc3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -121,9 +121,11 @@ public class SeriesReader {
 
   public boolean hasNextChunk() throws IOException {
 
-    if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader.hasNextTimeValuePair()) {
+    if (!cachedPageReaders.isEmpty() || firstPageReader != null || mergeReader
+        .hasNextTimeValuePair()) {
       throw new IOException("all cached pages should be consumed first");
     }
+    firstChunkMetaData = null;
 
     // init first chunk metadata whose startTime is minimum
     tryToUnpackAllOverlappedFilesToChunkMetadatas();
@@ -149,8 +151,8 @@ public class SeriesReader {
   }
 
   /**
-   * This method should be called after hasNextChunk()
-   * make sure that all overlapped pages are consumed before
+   * This method should be called after hasNextChunk() make sure that all overlapped pages are
+   * consumed before
    */
   public boolean hasNextPage() throws IOException {
     if (mergeReader.hasNextTimeValuePair()) {
@@ -182,7 +184,8 @@ public class SeriesReader {
   }
 
 
-  private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime) throws IOException {
+  private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endTime)
+      throws IOException {
     while (!seqChunkMetadatas.isEmpty() && endTime >= seqChunkMetadatas.get(0).getStartTime()) {
       unpackOneChunkMetaData(seqChunkMetadatas.remove(0));
     }
@@ -233,7 +236,7 @@ public class SeriesReader {
     }
 
     BatchData pageData = firstPageReader.data.getAllSatisfiedPageData();
-
+    firstPageReader = null;
     /*
      * no value filter
      * only need to consider valueFilter because timeFilter has been set into the page reader
@@ -293,8 +296,11 @@ public class SeriesReader {
         unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp());
         unpackAllOverlappedCachedPageReadersToMergeReader(timeValuePair.getTimestamp());
 
-        cachedBatchData.putAnObject(
-            timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+        if (valueFilter == null || valueFilter
+            .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+          cachedBatchData.putAnObject(
+              timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
+        }
 
         mergeReader.nextTimeValuePair();
 
@@ -433,9 +439,8 @@ public class SeriesReader {
 
 
   /**
-   *
    * unpack all overlapped seq/unseq files and find the first chunk metadata
-   *
+   * <p>
    * Because there may be too many files in the scenario used by the user, we cannot open all the
    * chunks at once, which may cause OOM, so we can only unpack one file at a time when needed. This
    * approach is likely to be ubiquitous, but it keeps the system running smoothly
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index a464d32..d5eb43c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -65,16 +65,29 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
     /*
      * consume overlapped data firstly
      */
-    while (seriesReader.hasNextOverlappedPage()) {
-      batchData = seriesReader.nextOverlappedPage();
-      if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
-        return true;
-      }
+    if (readOverlappedPage(timestamp)) {
+      return true;
     }
 
     /*
      * consume pages secondly
      */
+    if (readPageData(timestamp)) {
+      return true;
+    }
+
+    /*
+     * consume next chunk
+     */
+    while (seriesReader.hasNextChunk()) {
+      if (readPageData(timestamp)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readPageData(long timestamp) throws IOException {
     while (seriesReader.hasNextPage()) {
       if (!seriesReader.isPageOverlapped()) {
         if (!satisfyTimeFilter(seriesReader.currentPageStatistics())) {
@@ -87,36 +100,21 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
           }
         }
       }
-      while (seriesReader.hasNextOverlappedPage()) {
-        batchData = seriesReader.nextOverlappedPage();
-        if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
-          return true;
-        }
+      if (readOverlappedPage(timestamp)) {
+        return true;
       }
     }
+    return false;
+  }
 
-    /*
-     * consume next chunk
-     */
-    while (seriesReader.hasNextChunk()) {
-      while (seriesReader.hasNextPage()) {
-        if (!seriesReader.isPageOverlapped()) {
-          if (!satisfyTimeFilter(seriesReader.currentPageStatistics())) {
-            seriesReader.skipCurrentPage();
-            continue;
-          } else {
-            batchData = seriesReader.nextPage();
-            if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
-              return true;
-            }
-          }
-        }
-        while (seriesReader.hasNextOverlappedPage()) {
-          batchData = seriesReader.nextOverlappedPage();
-          if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
-            return true;
-          }
-        }
+  private boolean readOverlappedPage(long timestamp) throws IOException {
+    while (seriesReader.hasNextOverlappedPage()) {
+      batchData = seriesReader.nextOverlappedPage();
+      if (isEmpty(batchData)) {
+        continue;
+      }
+      if (batchData.getTimeByIndex(batchData.length() - 1) >= timestamp) {
+        return true;
       }
     }
     return false;
@@ -126,4 +124,7 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp {
     return seriesReader.getTimeFilter().satisfy(statistics);
   }
 
+  private boolean isEmpty(BatchData batchData) {
+    return batchData == null || !batchData.hasCurrent();
+  }
 }