You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/12 07:04:15 UTC
[iotdb] branch lmh/groupByTest updated: add more metric about merge reader
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/groupByTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/lmh/groupByTest by this push:
new d0dd744a12 add more metric about merge reader
d0dd744a12 is described below
commit d0dd744a120f301cfc797ca9c86edd927111adc8
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Apr 12 15:04:01 2023 +0800
add more metric about merge reader
---
.../execution/operator/source/SeriesScanUtil.java | 117 +++++++++++----------
.../iotdb/db/mpp/statistics/QueryStatistics.java | 6 ++
.../universal/AlignedPriorityMergeReader.java | 9 +-
.../reader/universal/PriorityMergeReader.java | 109 +++++++++----------
4 files changed, 132 insertions(+), 109 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 4d5fa8a243..70a540070e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -794,66 +794,73 @@ public class SeriesScanUtil {
*/
timeValuePair = mergeReader.nextTimeValuePair();
- Object valueForFilter = timeValuePair.getValue().getValue();
-
- // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
- // only accept AlignedPath with only one sub sensor
- if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
- for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
- if (tsPrimitiveType != null) {
- valueForFilter = tsPrimitiveType.getValue();
- break;
+ long st = System.nanoTime();
+ try {
+ Object valueForFilter = timeValuePair.getValue().getValue();
+
+ // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it
+ // will
+ // only accept AlignedPath with only one sub sensor
+ if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+ for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+ if (tsPrimitiveType != null) {
+ valueForFilter = tsPrimitiveType.getValue();
+ break;
+ }
}
}
- }
- Filter queryFilter = scanOptions.getQueryFilter();
- if (queryFilter != null
- && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
- continue;
- }
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- continue;
- }
- if (paginationController.hasCurLimit()) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- switch (dataType) {
- case BOOLEAN:
- builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
- break;
- case INT64:
- builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
- break;
- case VECTOR:
- TsPrimitiveType[] values = timeValuePair.getValue().getVector();
- for (int i = 0; i < values.length; i++) {
- if (values[i] == null) {
- builder.getColumnBuilder(i).appendNull();
- } else {
- builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ Filter queryFilter = scanOptions.getQueryFilter();
+ if (queryFilter != null
+ && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+ continue;
+ }
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ continue;
+ }
+ if (paginationController.hasCurLimit()) {
+ timeBuilder.writeLong(timeValuePair.getTimestamp());
+ switch (dataType) {
+ case BOOLEAN:
+ builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
+ break;
+ case INT32:
+ builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
+ break;
+ case INT64:
+ builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
+ break;
+ case FLOAT:
+ builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
+ break;
+ case TEXT:
+ builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+ break;
+ case VECTOR:
+ TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ for (int i = 0; i < values.length; i++) {
+ if (values[i] == null) {
+ builder.getColumnBuilder(i).appendNull();
+ } else {
+ builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
}
- }
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
+ builder.declarePosition();
+ paginationController.consumeLimit();
+ } else {
+ break;
}
- builder.declarePosition();
- paginationController.consumeLimit();
- } else {
- break;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_BUILD_RES, System.nanoTime() - st);
}
}
hasCachedNextOverlappedPage = !builder.isEmpty();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index 387db02499..8e287e34da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -172,6 +172,12 @@ public class QueryStatistics {
public static final String PICK_FIRST_TIMESERIES_METADATA = "pickFirstTimeSeriesMetadata";
public static final String INIT_FIRST_PAGE = "initFirstPage";
+ public static final String MERGE_READER_ADD_READER = "mergeReader#addReader";
+ public static final String MERGE_READER_NEXT = "mergeReader#nextTimeValuePair";
+ public static final String MERGE_READER_UPDATE_HEAP = "mergeReader#updateHeap";
+ public static final String MERGE_READER_FILL_NULL_VALUE = "mergeReader#fillNullValue";
+ public static final String MERGE_READER_BUILD_RES = "mergeReader#buildRes";
+
private QueryStatistics() {
ScheduledExecutorService scheduledExecutor =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
index 9d5875abde..801aaf85d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.universal;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -35,7 +36,13 @@ public class AlignedPriorityMergeReader extends PriorityMergeReader {
*/
@Override
protected void fillNullValue(TimeValuePair v, TimeValuePair c) {
- fillNullValueInAligned(v, c);
+ long startTime = System.nanoTime();
+ try {
+ fillNullValueInAligned(v, c);
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_FILL_NULL_VALUE, System.nanoTime() - startTime);
+ }
}
static void fillNullValueInAligned(TimeValuePair v, TimeValuePair c) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 471090bb62..475b0d0a28 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,12 @@
*/
package org.apache.iotdb.db.query.reader.universal;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -47,21 +47,6 @@ public class PriorityMergeReader implements IPointReader {
});
}
- // only used in external sort, need to refactor later
- public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority)
- throws IOException {
- heap =
- new PriorityQueue<>(
- (o1, o2) -> {
- int timeCompare =
- Long.compare(o1.timeValuePair.getTimestamp(), o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
- });
- for (IPointReader reader : prioritySeriesReaders) {
- addReader(reader, startPriority++);
- }
- }
-
public void addReader(IPointReader reader, long priority) throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(
@@ -74,11 +59,17 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(
IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context)
throws IOException {
- if (reader.hasNextTimeValuePair()) {
- heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
- currentReadStopTime = Math.max(currentReadStopTime, endTime);
- } else {
- reader.close();
+ long startTime = System.nanoTime();
+ try {
+ if (reader.hasNextTimeValuePair()) {
+ heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+ currentReadStopTime = Math.max(currentReadStopTime, endTime);
+ } else {
+ reader.close();
+ }
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_ADD_READER, System.nanoTime() - startTime);
}
}
@@ -93,19 +84,25 @@ public class PriorityMergeReader implements IPointReader {
@Override
public TimeValuePair nextTimeValuePair() throws IOException {
- Element top = heap.poll();
- TimeValuePair ret = top.getTimeValuePair();
- TimeValuePair topNext = null;
- if (top.hasNext()) {
- top.next();
- topNext = top.currPair();
- }
- updateHeap(ret, topNext);
- if (topNext != null) {
- top.timeValuePair = topNext;
- heap.add(top);
+ long startTime = System.nanoTime();
+ try {
+ Element top = heap.poll();
+ TimeValuePair ret = top.getTimeValuePair();
+ TimeValuePair topNext = null;
+ if (top.hasNext()) {
+ top.next();
+ topNext = top.currPair();
+ }
+ updateHeap(ret, topNext);
+ if (topNext != null) {
+ top.timeValuePair = topNext;
+ heap.add(top);
+ }
+ return ret;
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_NEXT, System.nanoTime() - startTime);
}
- return ret;
}
@Override
@@ -119,29 +116,35 @@ public class PriorityMergeReader implements IPointReader {
* TimeValuePair
*/
protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOException {
- long topTime = ret.getTimestamp();
- long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
- while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
- Element e = heap.poll();
- fillNullValue(ret, e.getTimeValuePair());
- if (!e.hasNext()) {
- e.reader.close();
- continue;
- }
- e.next();
- if (e.currTime() == topNextTime) {
- // if the next value of the peek will be overwritten by the next of the top, skip it
- fillNullValue(topNext, e.getTimeValuePair());
- if (e.hasNext()) {
- e.next();
- heap.add(e);
+ long startTime = System.nanoTime();
+ try {
+ long topTime = ret.getTimestamp();
+ long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
+ while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
+ Element e = heap.poll();
+ fillNullValue(ret, e.getTimeValuePair());
+ if (!e.hasNext()) {
+ e.reader.close();
+ continue;
+ }
+ e.next();
+ if (e.currTime() == topNextTime) {
+ // if the next value of the peek will be overwritten by the next of the top, skip it
+ fillNullValue(topNext, e.getTimeValuePair());
+ if (e.hasNext()) {
+ e.next();
+ heap.add(e);
+ } else {
+ // the chunk is end
+ e.close();
+ }
} else {
- // the chunk is end
- e.close();
+ heap.add(e);
}
- } else {
- heap.add(e);
}
+ } finally {
+ QueryStatistics.getInstance()
+ .addCost(QueryStatistics.MERGE_READER_UPDATE_HEAP, System.nanoTime() - startTime);
}
}