You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/12/07 08:34:05 UTC
[iotdb] branch master updated: [IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a0b2c8cc42 [IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)
a0b2c8cc42 is described below
commit a0b2c8cc42084fbe293f0a02a5ffdd9e6ba8ab3b
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Wed Dec 7 16:33:58 2022 +0800
[IOTDB-5130]Accelerate the compaction of nonOverlap points in overlap pages (#8357)
---
.../compaction/cross/utils/PointElement.java | 2 +-
.../compaction/reader/PointPriorityReader.java | 81 ++++++++++++++++------
2 files changed, 61 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
index 65b4b232fe..19fe7301f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java
@@ -38,7 +38,7 @@ public class PointElement {
} else {
this.pointReader = pageElement.batchData.getTsBlockAlignedRowIterator();
}
- this.timeValuePair = pointReader.currentTimeValuePair();
+ this.timeValuePair = pointReader.nextTimeValuePair();
this.timestamp = timeValuePair.getTimestamp();
this.priority = pageElement.priority;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
index 72b33628a6..433602d787 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java
@@ -46,7 +46,11 @@ public class PointPriorityReader {
private TimeValuePair currentPoint;
- private boolean shouldReadNextPoint = true;
+ private boolean shouldReadPointFromQueue = true;
+
+ private long nextPageStartTime = Long.MAX_VALUE;
+
+ private PointElement currentPointElement;
public PointPriorityReader(SeriesCompactionExecutor.RemovePage removePage) {
this.removePage = removePage;
@@ -59,8 +63,9 @@ public class PointPriorityReader {
}
public TimeValuePair currentPoint() {
- if (shouldReadNextPoint) {
- // get the highest priority point
+ if (shouldReadPointFromQueue && currentPointElement == null) {
+ // if the current point is overlapped with other pages, then get the highest priority point
+ // from queue
currentPoint = pointQueue.peek().timeValuePair;
lastTime = currentPoint.getTimestamp();
@@ -69,7 +74,7 @@ public class PointPriorityReader {
fillAlignedNullValue();
}
- shouldReadNextPoint = false;
+ shouldReadPointFromQueue = false;
}
return currentPoint;
}
@@ -87,7 +92,7 @@ public class PointPriorityReader {
int nullValueNum = currentValues.length;
while (!pointQueue.isEmpty()) {
if (pointQueue.peek().timestamp > lastTime) {
- // the smallest time of all pages is later then the last time, then break the loop
+ // the smallest time of all pages is later than the last time, then break the loop
break;
} else {
// find the data points in other pages that has the same timestamp
@@ -118,35 +123,69 @@ public class PointPriorityReader {
}
public void next() throws IllegalPathException, IOException, WriteProcessException {
- // remove data points with the same timestamp as the last point
- while (!pointQueue.isEmpty()) {
- if (pointQueue.peek().timestamp > lastTime) {
- // the smallest time of all pages is later than the last time, then break the loop
- break;
+ if (currentPointElement != null) {
+ IPointReader pointReader = currentPointElement.pointReader;
+ if (pointReader.hasNextTimeValuePair()) {
+ // get the point directly if it is not overlapped with other points
+ currentPoint = pointReader.nextTimeValuePair();
+ if (currentPoint.getTimestamp() >= nextPageStartTime) {
+ // if the point is overlapped with other points, then add it into priority queue
+ currentPointElement.setPoint(currentPoint);
+ pointQueue.add(currentPointElement);
+ currentPointElement = null;
+ }
} else {
- // find the data points in other pages that has the same timestamp
- PointElement pointElement = pointQueue.poll();
- IPointReader pointReader = pointElement.pointReader;
- if (pointReader.hasNextTimeValuePair()) {
- pointElement.setPoint(pointReader.nextTimeValuePair());
- pointQueue.add(pointElement);
+ // end page
+ PageElement pageElement = currentPointElement.pageElement;
+ currentPointElement = null;
+ removePage.call(pageElement);
+ }
+ } else {
+ // remove data points with the same timestamp as the last point
+ while (!pointQueue.isEmpty()) {
+ if (pointQueue.peek().timestamp > lastTime) {
+ // the smallest time of all pages is later than the last time, then break the loop
+ break;
} else {
- // end page
- removePage.call(pointElement.pageElement);
+ // find the data points in other pages that has the same timestamp
+ PointElement pointElement = pointQueue.poll();
+ IPointReader pointReader = pointElement.pointReader;
+ if (pointReader.hasNextTimeValuePair()) {
+ pointElement.setPoint(pointReader.nextTimeValuePair());
+ nextPageStartTime =
+ pointQueue.size() > 0 ? pointQueue.peek().pageElement.startTime : Long.MAX_VALUE;
+ if (pointElement.timestamp < nextPageStartTime) {
+ currentPointElement = pointElement;
+ currentPoint = currentPointElement.timeValuePair;
+ } else {
+ pointQueue.add(pointElement);
+ }
+ } else {
+ // end page
+ removePage.call(pointElement.pageElement);
+ }
}
}
}
- shouldReadNextPoint = true;
+ shouldReadPointFromQueue = true;
}
public boolean hasNext() {
- return !pointQueue.isEmpty();
+ return currentPointElement != null || !pointQueue.isEmpty();
}
/** Add a new overlapped page. */
public void addNewPage(PageElement pageElement) throws IOException {
+ if (currentPointElement != null) {
+ nextPageStartTime = Math.min(nextPageStartTime, pageElement.startTime);
+ if (currentPoint.getTimestamp() >= nextPageStartTime) {
+ currentPointElement.setPoint(currentPoint);
+ pointQueue.add(currentPointElement);
+ currentPointElement = null;
+ }
+ }
pageElement.deserializePage();
pointQueue.add(new PointElement(pageElement));
- shouldReadNextPoint = true;
+ shouldReadPointFromQueue = true;
}
}