You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/12 07:04:15 UTC

[iotdb] branch lmh/groupByTest updated: add more metric about merge reader

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/groupByTest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/lmh/groupByTest by this push:
     new d0dd744a12 add more metric about merge reader
d0dd744a12 is described below

commit d0dd744a120f301cfc797ca9c86edd927111adc8
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Apr 12 15:04:01 2023 +0800

    add more metric about merge reader
---
 .../execution/operator/source/SeriesScanUtil.java  | 117 +++++++++++----------
 .../iotdb/db/mpp/statistics/QueryStatistics.java   |   6 ++
 .../universal/AlignedPriorityMergeReader.java      |   9 +-
 .../reader/universal/PriorityMergeReader.java      | 109 +++++++++----------
 4 files changed, 132 insertions(+), 109 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 4d5fa8a243..70a540070e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -794,66 +794,73 @@ public class SeriesScanUtil {
              */
             timeValuePair = mergeReader.nextTimeValuePair();
 
-            Object valueForFilter = timeValuePair.getValue().getValue();
-
-            // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will
-            // only accept AlignedPath with only one sub sensor
-            if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
-              for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
-                if (tsPrimitiveType != null) {
-                  valueForFilter = tsPrimitiveType.getValue();
-                  break;
+            long st = System.nanoTime();
+            try {
+              Object valueForFilter = timeValuePair.getValue().getValue();
+
+              // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it
+              // will
+              // only accept AlignedPath with only one sub sensor
+              if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
+                for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) {
+                  if (tsPrimitiveType != null) {
+                    valueForFilter = tsPrimitiveType.getValue();
+                    break;
+                  }
                 }
               }
-            }
 
-            Filter queryFilter = scanOptions.getQueryFilter();
-            if (queryFilter != null
-                && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
-              continue;
-            }
-            if (paginationController.hasCurOffset()) {
-              paginationController.consumeOffset();
-              continue;
-            }
-            if (paginationController.hasCurLimit()) {
-              timeBuilder.writeLong(timeValuePair.getTimestamp());
-              switch (dataType) {
-                case BOOLEAN:
-                  builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
-                  break;
-                case INT32:
-                  builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
-                  break;
-                case INT64:
-                  builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
-                  break;
-                case FLOAT:
-                  builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
-                  break;
-                case DOUBLE:
-                  builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
-                  break;
-                case TEXT:
-                  builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
-                  break;
-                case VECTOR:
-                  TsPrimitiveType[] values = timeValuePair.getValue().getVector();
-                  for (int i = 0; i < values.length; i++) {
-                    if (values[i] == null) {
-                      builder.getColumnBuilder(i).appendNull();
-                    } else {
-                      builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+              Filter queryFilter = scanOptions.getQueryFilter();
+              if (queryFilter != null
+                  && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) {
+                continue;
+              }
+              if (paginationController.hasCurOffset()) {
+                paginationController.consumeOffset();
+                continue;
+              }
+              if (paginationController.hasCurLimit()) {
+                timeBuilder.writeLong(timeValuePair.getTimestamp());
+                switch (dataType) {
+                  case BOOLEAN:
+                    builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
+                    break;
+                  case INT32:
+                    builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
+                    break;
+                  case INT64:
+                    builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
+                    break;
+                  case FLOAT:
+                    builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
+                    break;
+                  case DOUBLE:
+                    builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
+                    break;
+                  case TEXT:
+                    builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
+                    break;
+                  case VECTOR:
+                    TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+                    for (int i = 0; i < values.length; i++) {
+                      if (values[i] == null) {
+                        builder.getColumnBuilder(i).appendNull();
+                      } else {
+                        builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+                      }
                     }
-                  }
-                  break;
-                default:
-                  throw new UnSupportedDataTypeException(String.valueOf(dataType));
+                    break;
+                  default:
+                    throw new UnSupportedDataTypeException(String.valueOf(dataType));
+                }
+                builder.declarePosition();
+                paginationController.consumeLimit();
+              } else {
+                break;
               }
-              builder.declarePosition();
-              paginationController.consumeLimit();
-            } else {
-              break;
+            } finally {
+              QueryStatistics.getInstance()
+                  .addCost(QueryStatistics.MERGE_READER_BUILD_RES, System.nanoTime() - st);
             }
           }
           hasCachedNextOverlappedPage = !builder.isEmpty();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index 387db02499..8e287e34da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -172,6 +172,12 @@ public class QueryStatistics {
   public static final String PICK_FIRST_TIMESERIES_METADATA = "pickFirstTimeSeriesMetadata";
   public static final String INIT_FIRST_PAGE = "initFirstPage";
 
+  public static final String MERGE_READER_ADD_READER = "mergeReader#addReader";
+  public static final String MERGE_READER_NEXT = "mergeReader#nextTimeValuePair";
+  public static final String MERGE_READER_UPDATE_HEAP = "mergeReader#updateHeap";
+  public static final String MERGE_READER_FILL_NULL_VALUE = "mergeReader#fillNullValue";
+  public static final String MERGE_READER_BUILD_RES = "mergeReader#buildRes";
+
   private QueryStatistics() {
     ScheduledExecutorService scheduledExecutor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
index 9d5875abde..801aaf85d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/AlignedPriorityMergeReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.universal;
 
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -35,7 +36,13 @@ public class AlignedPriorityMergeReader extends PriorityMergeReader {
    */
   @Override
   protected void fillNullValue(TimeValuePair v, TimeValuePair c) {
-    fillNullValueInAligned(v, c);
+    long startTime = System.nanoTime();
+    try {
+      fillNullValueInAligned(v, c);
+    } finally {
+      QueryStatistics.getInstance()
+          .addCost(QueryStatistics.MERGE_READER_FILL_NULL_VALUE, System.nanoTime() - startTime);
+    }
   }
 
   static void fillNullValueInAligned(TimeValuePair v, TimeValuePair c) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 471090bb62..475b0d0a28 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,12 @@
  */
 package org.apache.iotdb.db.query.reader.universal;
 
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 import java.util.PriorityQueue;
 
@@ -47,21 +47,6 @@ public class PriorityMergeReader implements IPointReader {
             });
   }
 
-  // only used in external sort, need to refactor later
-  public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority)
-      throws IOException {
-    heap =
-        new PriorityQueue<>(
-            (o1, o2) -> {
-              int timeCompare =
-                  Long.compare(o1.timeValuePair.getTimestamp(), o2.timeValuePair.getTimestamp());
-              return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
-            });
-    for (IPointReader reader : prioritySeriesReaders) {
-      addReader(reader, startPriority++);
-    }
-  }
-
   public void addReader(IPointReader reader, long priority) throws IOException {
     if (reader.hasNextTimeValuePair()) {
       heap.add(
@@ -74,11 +59,17 @@ public class PriorityMergeReader implements IPointReader {
   public void addReader(
       IPointReader reader, MergeReaderPriority priority, long endTime, QueryContext context)
       throws IOException {
-    if (reader.hasNextTimeValuePair()) {
-      heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
-      currentReadStopTime = Math.max(currentReadStopTime, endTime);
-    } else {
-      reader.close();
+    long startTime = System.nanoTime();
+    try {
+      if (reader.hasNextTimeValuePair()) {
+        heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+        currentReadStopTime = Math.max(currentReadStopTime, endTime);
+      } else {
+        reader.close();
+      }
+    } finally {
+      QueryStatistics.getInstance()
+          .addCost(QueryStatistics.MERGE_READER_ADD_READER, System.nanoTime() - startTime);
     }
   }
 
@@ -93,19 +84,25 @@ public class PriorityMergeReader implements IPointReader {
 
   @Override
   public TimeValuePair nextTimeValuePair() throws IOException {
-    Element top = heap.poll();
-    TimeValuePair ret = top.getTimeValuePair();
-    TimeValuePair topNext = null;
-    if (top.hasNext()) {
-      top.next();
-      topNext = top.currPair();
-    }
-    updateHeap(ret, topNext);
-    if (topNext != null) {
-      top.timeValuePair = topNext;
-      heap.add(top);
+    long startTime = System.nanoTime();
+    try {
+      Element top = heap.poll();
+      TimeValuePair ret = top.getTimeValuePair();
+      TimeValuePair topNext = null;
+      if (top.hasNext()) {
+        top.next();
+        topNext = top.currPair();
+      }
+      updateHeap(ret, topNext);
+      if (topNext != null) {
+        top.timeValuePair = topNext;
+        heap.add(top);
+      }
+      return ret;
+    } finally {
+      QueryStatistics.getInstance()
+          .addCost(QueryStatistics.MERGE_READER_NEXT, System.nanoTime() - startTime);
     }
-    return ret;
   }
 
   @Override
@@ -119,29 +116,35 @@ public class PriorityMergeReader implements IPointReader {
    * TimeValuePair
    */
   protected void updateHeap(TimeValuePair ret, TimeValuePair topNext) throws IOException {
-    long topTime = ret.getTimestamp();
-    long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
-    while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
-      Element e = heap.poll();
-      fillNullValue(ret, e.getTimeValuePair());
-      if (!e.hasNext()) {
-        e.reader.close();
-        continue;
-      }
-      e.next();
-      if (e.currTime() == topNextTime) {
-        // if the next value of the peek will be overwritten by the next of the top, skip it
-        fillNullValue(topNext, e.getTimeValuePair());
-        if (e.hasNext()) {
-          e.next();
-          heap.add(e);
+    long startTime = System.nanoTime();
+    try {
+      long topTime = ret.getTimestamp();
+      long topNextTime = (topNext == null ? Long.MAX_VALUE : topNext.getTimestamp());
+      while (!heap.isEmpty() && heap.peek().currTime() == topTime) {
+        Element e = heap.poll();
+        fillNullValue(ret, e.getTimeValuePair());
+        if (!e.hasNext()) {
+          e.reader.close();
+          continue;
+        }
+        e.next();
+        if (e.currTime() == topNextTime) {
+          // if the next value of the peek will be overwritten by the next of the top, skip it
+          fillNullValue(topNext, e.getTimeValuePair());
+          if (e.hasNext()) {
+            e.next();
+            heap.add(e);
+          } else {
+            // the chunk is end
+            e.close();
+          }
         } else {
-          // the chunk is end
-          e.close();
+          heap.add(e);
         }
-      } else {
-        heap.add(e);
       }
+    } finally {
+      QueryStatistics.getInstance()
+          .addCost(QueryStatistics.MERGE_READER_UPDATE_HEAP, System.nanoTime() - startTime);
     }
   }