You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/07 08:34:05 UTC

[iotdb] branch master updated: [IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b2c8cc42 [IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)
a0b2c8cc42 is described below

commit a0b2c8cc42084fbe293f0a02a5ffdd9e6ba8ab3b
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Dec 7 16:33:58 2022 +0800

    [IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)
---
 .../compaction/cross/utils/PointElement.java       |  2 +-
 .../compaction/reader/PointPriorityReader.java     | 81 ++++++++++++++++------
 2 files changed, 61 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
index 65b4b232fe..19fe7301f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
@@ -38,7 +38,7 @@ public class PointElement {
     } else {
       this.pointReader = pageElement.batchData.getTsBlockAlignedRowIterator();
     }
-    this.timeValuePair = pointReader.currentTimeValuePair();
+    this.timeValuePair = pointReader.nextTimeValuePair();
     this.timestamp = timeValuePair.getTimestamp();
     this.priority = pageElement.priority;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
index 72b33628a6..433602d787 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
@@ -46,7 +46,11 @@ public class PointPriorityReader {
 
   private TimeValuePair currentPoint;
 
-  private boolean shouldReadNextPoint = true;
+  private boolean shouldReadPointFromQueue = true;
+
+  private long nextPageStartTime = Long.MAX_VALUE;
+
+  private PointElement currentPointElement;
 
   public PointPriorityReader(SeriesCompactionExecutor.RemovePage removePage) {
     this.removePage = removePage;
@@ -59,8 +63,9 @@ public class PointPriorityReader {
   }
 
   public TimeValuePair currentPoint() {
-    if (shouldReadNextPoint) {
-      // get the highest priority point
+    if (shouldReadPointFromQueue && currentPointElement == null) {
+      // if the current point is overlapped with other pages, then get the highest priority point
+      // from queue
       currentPoint = pointQueue.peek().timeValuePair;
       lastTime = currentPoint.getTimestamp();
 
@@ -69,7 +74,7 @@ public class PointPriorityReader {
         fillAlignedNullValue();
       }
 
-      shouldReadNextPoint = false;
+      shouldReadPointFromQueue = false;
     }
     return currentPoint;
   }
@@ -87,7 +92,7 @@ public class PointPriorityReader {
     int nullValueNum = currentValues.length;
     while (!pointQueue.isEmpty()) {
       if (pointQueue.peek().timestamp > lastTime) {
-        // the smallest time of all pages is later then the last time, then break the loop
+        // the smallest time of all pages is later than the last time, then break the loop
         break;
       } else {
         // find the data points in other pages that has the same timestamp
@@ -118,35 +123,69 @@ public class PointPriorityReader {
   }
 
   public void next() throws IllegalPathException, IOException, WriteProcessException {
-    // remove data points with the same timestamp as the last point
-    while (!pointQueue.isEmpty()) {
-      if (pointQueue.peek().timestamp > lastTime) {
-        // the smallest time of all pages is later than the last time, then break the loop
-        break;
+    if (currentPointElement != null) {
+      IPointReader pointReader = currentPointElement.pointReader;
+      if (pointReader.hasNextTimeValuePair()) {
+        // get the point directly if it is not overlapped with other points
+        currentPoint = pointReader.nextTimeValuePair();
+        if (currentPoint.getTimestamp() >= nextPageStartTime) {
+          // if the point is overlapped with other points, then add it into priority queue
+          currentPointElement.setPoint(currentPoint);
+          pointQueue.add(currentPointElement);
+          currentPointElement = null;
+        }
       } else {
-        // find the data points in other pages that has the same timestamp
-        PointElement pointElement = pointQueue.poll();
-        IPointReader pointReader = pointElement.pointReader;
-        if (pointReader.hasNextTimeValuePair()) {
-          pointElement.setPoint(pointReader.nextTimeValuePair());
-          pointQueue.add(pointElement);
+        // end page
+        PageElement pageElement = currentPointElement.pageElement;
+        currentPointElement = null;
+        removePage.call(pageElement);
+      }
+    } else {
+      // remove data points with the same timestamp as the last point
+      while (!pointQueue.isEmpty()) {
+        if (pointQueue.peek().timestamp > lastTime) {
+          // the smallest time of all pages is later than the last time, then break the loop
+          break;
         } else {
-          // end page
-          removePage.call(pointElement.pageElement);
+          // find the data points in other pages that has the same timestamp
+          PointElement pointElement = pointQueue.poll();
+          IPointReader pointReader = pointElement.pointReader;
+          if (pointReader.hasNextTimeValuePair()) {
+            pointElement.setPoint(pointReader.nextTimeValuePair());
+            nextPageStartTime =
+                pointQueue.size() > 0 ? pointQueue.peek().pageElement.startTime : Long.MAX_VALUE;
+            if (pointElement.timestamp < nextPageStartTime) {
+              currentPointElement = pointElement;
+              currentPoint = currentPointElement.timeValuePair;
+            } else {
+              pointQueue.add(pointElement);
+            }
+          } else {
+            // end page
+            removePage.call(pointElement.pageElement);
+          }
         }
       }
     }
-    shouldReadNextPoint = true;
+    shouldReadPointFromQueue = true;
   }
 
   public boolean hasNext() {
-    return !pointQueue.isEmpty();
+    return currentPointElement != null || !pointQueue.isEmpty();
   }
 
   /** Add a new overlapped page. */
   public void addNewPage(PageElement pageElement) throws IOException {
+    if (currentPointElement != null) {
+      nextPageStartTime = Math.min(nextPageStartTime, pageElement.startTime);
+      if (currentPoint.getTimestamp() >= nextPageStartTime) {
+        currentPointElement.setPoint(currentPoint);
+        pointQueue.add(currentPointElement);
+        currentPointElement = null;
+      }
+    }
     pageElement.deserializePage();
     pointQueue.add(new PointElement(pageElement));
-    shouldReadNextPoint = true;
+    shouldReadPointFromQueue = true;
   }
 }