You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/20 09:45:48 UTC

[iotdb] branch QueryMetrics updated (40c026bd65 -> b95dc6e7d1)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 40c026bd65 delete modification
     new e5d744bd85 fix bug
     new b95dc6e7d1 Add more

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/mpp/execution/exchange/LocalSinkHandle.java |  15 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  48 +++--
 .../fragment/FragmentInstanceManager.java          |   8 +
 .../db/mpp/execution/operator/AggregationUtil.java |   2 +-
 .../AbstractSeriesAggregationScanOperator.java     | 193 ++++++++++++---------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  16 ++
 .../iotdb/db/mpp/statistics/QueryStatistics.java   |  41 ++++-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   8 +
 .../execution/exchange/LocalSourceHandleTest.java  |   2 +-
 .../execution/exchange/SharedTsBlockQueueTest.java |   2 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   2 +-
 .../read/common/block/column/BinaryColumn.java     |  23 ++-
 .../read/common/block/column/BooleanColumn.java    |  22 ++-
 .../read/common/block/column/DoubleColumn.java     |  27 ++-
 .../read/common/block/column/FloatColumn.java      |  23 ++-
 .../tsfile/read/common/block/column/IntColumn.java |  23 ++-
 .../read/common/block/column/LongColumn.java       |  23 ++-
 .../read/common/block/column/NullColumn.java       |  11 +-
 .../block/column/RunLengthEncodedColumn.java       |   3 -
 .../read/common/block/column/TimeColumn.java       |  13 +-
 .../iotdb/tsfile/read/filter/GroupByFilter.java    |  14 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |  75 +-------
 .../tsfile/read/reader/page/TimePageReader.java    |  33 ++++
 .../tsfile/read/reader/page/ValuePageReader.java   | 129 ++++++++++++++
 24 files changed, 527 insertions(+), 229 deletions(-)


[iotdb] 01/02: fix bug

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e5d744bd856c2cf92a78fb82d69222ffb788fd30
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Dec 20 12:44:44 2022 +0800

    fix bug
---
 .../db/mpp/execution/operator/AggregationUtil.java |   2 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |   2 +-
 .../read/common/block/column/BinaryColumn.java     |  23 +++-
 .../read/common/block/column/BooleanColumn.java    |  22 +++-
 .../read/common/block/column/DoubleColumn.java     |  27 ++++-
 .../read/common/block/column/FloatColumn.java      |  23 +++-
 .../tsfile/read/common/block/column/IntColumn.java |  23 +++-
 .../read/common/block/column/LongColumn.java       |  23 +++-
 .../read/common/block/column/NullColumn.java       |  11 +-
 .../block/column/RunLengthEncodedColumn.java       |   3 -
 .../read/common/block/column/TimeColumn.java       |  13 ++-
 .../iotdb/tsfile/read/filter/GroupByFilter.java    |   6 +-
 .../tsfile/read/reader/page/AlignedPageReader.java |  75 +-----------
 .../tsfile/read/reader/page/TimePageReader.java    |  33 ++++++
 .../tsfile/read/reader/page/ValuePageReader.java   | 129 +++++++++++++++++++++
 15 files changed, 298 insertions(+), 117 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 46873b8503..a8f47ff208 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -127,7 +127,7 @@ public class AggregationUtil {
                 ? inputTsBlock.getEndTime() > curTimeRange.getMax()
                 : inputTsBlock.getEndTime() < curTimeRange.getMin());
     return new Pair<>(
-        isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound, inputTsBlock);
+        isTsBlockOutOfBound || isAllAggregatorsHasFinalResult(aggregators), inputTsBlock);
   }
 
   /** Append a row of aggregation results to the result tsBlock. */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 70dad931fb..e04c378de7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -178,7 +178,7 @@ public class TsBlock {
     for (int i = 0; i < subValueColumns.length; i++) {
       subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
     }
-    return new TsBlock(subTimeColumn, subValueColumns);
+    return new TsBlock(false, subTimeColumn.getPositionCount(), subTimeColumn, subValueColumns);
   }
 
   public TsBlock skipFirst() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 85d910fd88..edbfd7ea58 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -70,6 +70,20 @@ public class BinaryColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  BinaryColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      Binary[] values,
+      long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.valueIsNull = valueIsNull;
+    // TODO we need to sum up all the Binary's retainedSize here
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.TEXT;
@@ -138,11 +152,12 @@ public class BinaryColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
     return new BinaryColumn(
-        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 5dc1d427b6..f72f485cb1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -69,6 +69,19 @@ public class BooleanColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  BooleanColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      boolean[] values,
+      long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.valueIsNull = valueIsNull;
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.BOOLEAN;
@@ -137,11 +150,12 @@ public class BooleanColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
     return new BooleanColumn(
-        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 9b13800838..695a26429b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -69,6 +69,24 @@ public class DoubleColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  DoubleColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      double[] values,
+      long retainedSizeInBytes) {
+
+    this.arrayOffset = arrayOffset;
+
+    this.positionCount = positionCount;
+
+    this.values = values;
+
+    this.valueIsNull = valueIsNull;
+
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.DOUBLE;
@@ -137,11 +155,12 @@ public class DoubleColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
     return new DoubleColumn(
-        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 4d8f3e0440..ab994230ca 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -68,6 +68,19 @@ public class FloatColumn implements Column {
     retainedSizeInBytes = (INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values));
   }
 
+  FloatColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      float[] values,
+      long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.valueIsNull = valueIsNull;
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.FLOAT;
@@ -136,10 +149,12 @@ public class FloatColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
-    return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+    return new FloatColumn(
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 120e4f44da..89094f305f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -68,6 +68,19 @@ public class IntColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  IntColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      int[] values,
+      long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.valueIsNull = valueIsNull;
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.INT32;
@@ -136,10 +149,12 @@ public class IntColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
-    return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+    return new IntColumn(
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index b87b61d2b3..947e8d18fd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -68,6 +68,19 @@ public class LongColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
   }
 
+  LongColumn(
+      int arrayOffset,
+      int positionCount,
+      boolean[] valueIsNull,
+      long[] values,
+      long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.valueIsNull = valueIsNull;
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.INT64;
@@ -136,10 +149,12 @@ public class LongColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
-    return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+    return new LongColumn(
+        arrayOffset + fromIndex,
+        positionCount - fromIndex,
+        valueIsNull,
+        values,
+        retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java
index 29a9f8ae6b..1cb946197a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/NullColumn.java
@@ -50,13 +50,7 @@ public class NullColumn implements Column {
   }
 
   public NullColumn(int arrayOffset, int positionCount) {
-    if (arrayOffset < 0) {
-      throw new IllegalArgumentException("arrayOffset is negative");
-    }
-    this.arrayOffset = positionCount;
-    if (positionCount < 0) {
-      throw new IllegalArgumentException("positionCount is negative");
-    }
+    this.arrayOffset = arrayOffset;
     this.positionCount = positionCount;
     retainedSizeInBytes = INSTANCE_SIZE;
   }
@@ -104,9 +98,6 @@ public class NullColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
     return new NullColumn(arrayOffset + fromIndex, positionCount - fromIndex);
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index b512b9f252..23c9e7a02d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -198,9 +198,6 @@ public class RunLengthEncodedColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
     return new RunLengthEncodedColumn(value, positionCount - fromIndex);
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index df6232ed9f..348e8ec693 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -59,6 +59,13 @@ public class TimeColumn implements Column {
     retainedSizeInBytes = INSTANCE_SIZE + sizeOf(values);
   }
 
+  TimeColumn(int arrayOffset, int positionCount, long[] values, long retainedSizeInBytes) {
+    this.arrayOffset = arrayOffset;
+    this.positionCount = positionCount;
+    this.values = values;
+    this.retainedSizeInBytes = retainedSizeInBytes;
+  }
+
   @Override
   public TSDataType getDataType() {
     return TSDataType.INT64;
@@ -116,10 +123,8 @@ public class TimeColumn implements Column {
 
   @Override
   public Column subColumn(int fromIndex) {
-    if (fromIndex > positionCount) {
-      throw new IllegalArgumentException("fromIndex is not valid");
-    }
-    return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, values);
+    return new TimeColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, values, retainedSizeInBytes);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index 851d122bab..72ea2a9f92 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -55,11 +55,7 @@ public class GroupByFilter implements Filter, Serializable {
 
   @Override
   public boolean satisfy(long time, Object value) {
-    if (time < startTime || time >= endTime) {
-      return false;
-    } else {
-      return (time - startTime) % slidingStep < interval;
-    }
+    return time >= startTime && time < endTime;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
index 3f076906bb..360441f50e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 public class AlignedPageReader implements IPageReader, IAlignedPageReader {
@@ -114,80 +113,18 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader {
   }
 
   @Override
-  public TsBlock getAllSatisfiedData() throws IOException {
+  public TsBlock getAllSatisfiedData() {
     builder.reset();
-    long[] timeBatch = timePageReader.getNextTimeBatch();
+    int[] offsetAndLength =
+        timePageReader.nextSatisfiedTimeBatch(filter, builder.getTimeColumnBuilder());
 
-    // if all the sub sensors' value are null in current row, just discard it
-    // if !filter.satisfy, discard this row
-    boolean[] keepCurrentRow = new boolean[timeBatch.length];
-    if (filter == null) {
-      Arrays.fill(keepCurrentRow, true);
-    } else {
-      for (int i = 0, n = timeBatch.length; i < n; i++) {
-        keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
-      }
-    }
-
-    // using bitMap in valuePageReaders to indicate whether columns of current row are all null.
-    byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
-    Arrays.fill(bitmask, (byte) 0x00);
-    boolean[][] isDeleted = new boolean[valueCount][timeBatch.length];
-    for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
-      ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
-      if (pageReader != null) {
-        byte[] bitmap = pageReader.getBitmap();
-        pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
-
-        for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) {
-          if (isDeleted[columnIndex][i]) {
-            int shift = i % 8;
-            bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
-          }
-        }
-        for (int i = 0, n = bitmask.length; i < n; i++) {
-          bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
-        }
-      }
-    }
-
-    for (int i = 0, n = bitmask.length; i < n; i++) {
-      if (bitmask[i] == (byte) 0xFF) {
-        // 8 rows are not all null, do nothing
-      } else if (bitmask[i] == (byte) 0x00) {
-        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
-          keepCurrentRow[i * 8 + j] = false;
-        }
-      } else {
-        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
-          if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
-            keepCurrentRow[i * 8 + j] = false;
-          }
-        }
-      }
-    }
-
-    // construct time column
-    for (int i = 0; i < timeBatch.length; i++) {
-      if (keepCurrentRow[i]) {
-        builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
-        builder.declarePosition();
-      }
-    }
+    builder.declarePositions(offsetAndLength[1]);
 
     // construct value columns
     for (int i = 0; i < valueCount; i++) {
       ValuePageReader pageReader = valuePageReaderList.get(i);
-      if (pageReader != null) {
-        pageReader.writeColumnBuilderWithNextBatch(
-            timeBatch, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
-      } else {
-        for (int j = 0; j < timeBatch.length; j++) {
-          if (keepCurrentRow[j]) {
-            builder.getColumnBuilder(i).appendNull();
-          }
-        }
-      }
+      pageReader.writeColumnBuilderWithNextBatch(
+          offsetAndLength[0], offsetAndLength[1], builder.getColumnBuilder(i));
     }
     return builder.build();
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
index fc05e2b0ba..362d8c54e8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -86,6 +88,37 @@ public class TimePageReader {
     }
   }
 
+  /**
+   * In case that we use sequence read, and the page doesn't have statistics, so we won't know time
+   * array's length at first
+   */
+  public int[] nextSatisfiedTimeBatch(Filter timeFilter, TimeColumnBuilder timeColumnBuilder) {
+    if (timeFilter == null) {
+      for (int i = 0, size = (int) pageHeader.getStatistics().getCount(); i < size; i++) {
+        timeColumnBuilder.writeLong(timeDecoder.readLong(timeBuffer));
+      }
+      return new int[] {0, (int) pageHeader.getStatistics().getCount()};
+    } else {
+      int offset = 0, size = (int) pageHeader.getStatistics().getCount(), totalCount = 1;
+      long current = timeDecoder.readLong(timeBuffer);
+      while (!timeFilter.satisfy(current, null) && totalCount < size) {
+        current = timeDecoder.readLong(timeBuffer);
+        totalCount++;
+        offset++;
+      }
+      if (!timeFilter.satisfy(current, null)) {
+        return new int[] {size, 0};
+      }
+      timeColumnBuilder.writeLong(current);
+      while (totalCount < size
+          && timeFilter.satisfy(current = timeDecoder.readLong(timeBuffer), null)) {
+        timeColumnBuilder.writeLong(current);
+        totalCount++;
+      }
+      return new int[] {offset, totalCount - offset};
+    }
+  }
+
   public TimeStatistics getStatistics() {
     return (TimeStatistics) pageHeader.getStatistics();
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
index 45729ec05e..4e70432858 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/ValuePageReader.java
@@ -328,6 +328,135 @@ public class ValuePageReader {
     }
   }
 
+  public void writeColumnBuilderWithNextBatch(int offset, int length, ColumnBuilder columnBuilder) {
+    if (valueBuffer == null) {
+      for (int i = 0; i < length; i++) {
+        columnBuilder.appendNull();
+      }
+      return;
+    }
+
+    int i = 0;
+    switch (dataType) {
+      case BOOLEAN:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readBoolean(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeBoolean(aBoolean);
+          }
+        }
+        break;
+      case INT32:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readInt(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeInt(anInt);
+          }
+        }
+        break;
+      case INT64:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readLong(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeLong(aLong);
+          }
+        }
+        break;
+      case FLOAT:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readFloat(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeFloat(aFloat);
+          }
+        }
+        break;
+      case DOUBLE:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readDouble(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeDouble(aDouble);
+          }
+        }
+        break;
+      case TEXT:
+        // skip offset
+        for (; i < offset; i++) {
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+
+          } else {
+            valueDecoder.readBinary(valueBuffer);
+          }
+        }
+        // read length
+        for (; i < offset + length; i++) {
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.writeBinary(aBinary);
+          }
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(dataType));
+    }
+  }
+
   public Statistics getStatistics() {
     return pageHeader.getStatistics();
   }


[iotdb] 02/02: Add more

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b95dc6e7d123b07c11af3e485a625029383ed057
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Dec 20 17:45:39 2022 +0800

    Add more
---
 .../db/mpp/execution/exchange/LocalSinkHandle.java |  15 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  48 +++--
 .../fragment/FragmentInstanceManager.java          |   8 +
 .../AbstractSeriesAggregationScanOperator.java     | 193 ++++++++++++---------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  16 ++
 .../iotdb/db/mpp/statistics/QueryStatistics.java   |  41 ++++-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   8 +
 .../execution/exchange/LocalSourceHandleTest.java  |   2 +-
 .../execution/exchange/SharedTsBlockQueueTest.java |   2 +-
 .../iotdb/tsfile/read/filter/GroupByFilter.java    |   8 +-
 10 files changed, 229 insertions(+), 112 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index 91aa31d705..e43052c7e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
@@ -32,9 +33,14 @@ import java.util.List;
 import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_AND_INVOKE_ON_FINISHED;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_END_LISTENER;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.SINK_HANDLE_FINISH_LISTENER;
 
 public class LocalSinkHandle implements ISinkHandle {
 
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   private static final Logger logger = LoggerFactory.getLogger(LocalSinkHandle.class);
 
   private final TFragmentInstanceId remoteFragmentInstanceId;
@@ -92,13 +98,17 @@ public class LocalSinkHandle implements ISinkHandle {
   }
 
   public void checkAndInvokeOnFinished() {
+    long startTime = System.nanoTime();
     synchronized (queue) {
       if (isFinished()) {
         synchronized (this) {
+          long start = System.nanoTime();
           sinkHandleListener.onFinish(this);
+          QUERY_STATISTICS.addCost(SINK_HANDLE_FINISH_LISTENER, System.nanoTime() - start);
         }
       }
     }
+    QUERY_STATISTICS.addCost(CHECK_AND_INVOKE_ON_FINISHED, System.nanoTime() - startTime);
   }
 
   @Override
@@ -135,10 +145,13 @@ public class LocalSinkHandle implements ISinkHandle {
         if (aborted || closed) {
           return;
         }
-        queue.setNoMoreTsBlocks(true);
+        queue.setNoMoreTsBlocks();
+        long startTime = System.nanoTime();
         sinkHandleListener.onEndOfBlocks(this);
+        QUERY_STATISTICS.addCost(SINK_HANDLE_END_LISTENER, System.nanoTime() - startTime);
       }
     }
+
     checkAndInvokeOnFinished();
     logger.debug("[EndSetNoMoreTsBlocksOnLocal]");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 95e64b1828..b371848767 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -38,11 +39,17 @@ import java.util.Queue;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.FREE_MEM;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_END;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NOTIFY_NEW_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.RESERVE_MEMORY;
 
 /** This is not thread safe class, the caller should ensure multi-threads safety. */
 @NotThreadSafe
 public class SharedTsBlockQueue {
 
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   private static final Logger logger = LoggerFactory.getLogger(SharedTsBlockQueue.class);
 
   private final TFragmentInstanceId localFragmentInstanceId;
@@ -121,15 +128,17 @@ public class SharedTsBlockQueue {
   }
 
   /** Notify no more tsblocks will be added to the queue. */
-  public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
+  public void setNoMoreTsBlocks() {
     logger.debug("[SignalNoMoreTsBlockOnQueue]");
     if (closed) {
       logger.warn("queue has been destroyed");
       return;
     }
-    this.noMoreTsBlocks = noMoreTsBlocks;
+    this.noMoreTsBlocks = true;
     if (!blocked.isDone()) {
+      long startTime = System.nanoTime();
       blocked.set(null);
+      QUERY_STATISTICS.addCost(NOTIFY_END, System.nanoTime() - startTime);
     }
     if (this.sourceHandle != null) {
       this.sourceHandle.checkAndInvokeOnFinished();
@@ -150,6 +159,7 @@ public class SharedTsBlockQueue {
     if (sinkHandle != null) {
       sinkHandle.checkAndInvokeOnFinished();
     }
+    long startTime = System.nanoTime();
     localMemoryManager
         .getQueryPool()
         .free(
@@ -157,6 +167,8 @@ public class SharedTsBlockQueue {
             localFragmentInstanceId.getInstanceId(),
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
+    QUERY_STATISTICS.addCost(FREE_MEM, System.nanoTime() - startTime);
+
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
     if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
       blocked = SettableFuture.create();
@@ -174,19 +186,23 @@ public class SharedTsBlockQueue {
       return immediateVoidFuture();
     }
 
-    Validate.notNull(tsBlock, "TsBlock cannot be null");
-    Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
-    Pair<ListenableFuture<Void>, Boolean> pair =
-        localMemoryManager
-            .getQueryPool()
-            .reserve(
-                localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
-                localPlanNodeId,
-                tsBlock.getRetainedSizeInBytes(),
-                maxBytesCanReserve);
-    blockedOnMemory = pair.left;
-    bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    long startTime = System.nanoTime();
+    Pair<ListenableFuture<Void>, Boolean> pair;
+    try {
+      pair =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(
+                  localFragmentInstanceId.getQueryId(),
+                  localFragmentInstanceId.getInstanceId(),
+                  localPlanNodeId,
+                  tsBlock.getRetainedSizeInBytes(),
+                  maxBytesCanReserve);
+      blockedOnMemory = pair.left;
+      bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    } finally {
+      QUERY_STATISTICS.addCost(RESERVE_MEMORY, System.nanoTime() - startTime);
+    }
 
     // reserve memory failed, we should wait until there is enough memory
     if (!pair.right) {
@@ -203,7 +219,9 @@ public class SharedTsBlockQueue {
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
       if (!blocked.isDone()) {
+        startTime = System.nanoTime();
         blocked.set(null);
+        QUERY_STATISTICS.addCost(NOTIFY_NEW_TSBLOCK, System.nanoTime() - startTime);
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index a502da89f0..7c5a27ba18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -48,6 +48,8 @@ import java.util.concurrent.TimeoutException;
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_CONTEXT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CREATE_FI_EXEC;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_EXECUTION_PLANNER;
 
 public class FragmentInstanceManager {
@@ -116,6 +118,7 @@ public class FragmentInstanceManager {
                 FragmentInstanceStateMachine stateMachine =
                     new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
 
+                long start = System.nanoTime();
                 FragmentInstanceContext context =
                     instanceContext.computeIfAbsent(
                         instanceId,
@@ -125,6 +128,7 @@ public class FragmentInstanceManager {
                                 stateMachine,
                                 instance.getSessionInfo(),
                                 intoOperationExecutor));
+                QUERY_STATISTICS.addCost(CREATE_FI_CONTEXT, System.nanoTime() - start);
 
                 try {
                   DataDriver driver =
@@ -134,6 +138,8 @@ public class FragmentInstanceManager {
                           context,
                           instance.getTimeFilter(),
                           dataRegion);
+
+                  start = System.nanoTime();
                   return createFragmentInstanceExecution(
                       scheduler,
                       instanceId,
@@ -146,6 +152,8 @@ public class FragmentInstanceManager {
                   logger.warn("error when create FragmentInstanceExecution.", t);
                   stateMachine.failed(t);
                   return null;
+                } finally {
+                  QUERY_STATISTICS.addCost(CREATE_FI_EXEC, System.nanoTime() - start);
                 }
               });
       if (execution != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index c565842d66..aeaf19b3a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -43,6 +43,11 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendA
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_RAW_DATA;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_NEXT_AGG_RES;
 
 public abstract class AbstractSeriesAggregationScanOperator implements DataSourceOperator {
 
@@ -154,8 +159,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
           aggregator.updateTimeRange(curTimeRange);
         }
 
+        long startTime = System.nanoTime();
         // calculate aggregation result on current time window
         calculateNextAggregationResult();
+        operatorContext.addOperatorTime(CAL_NEXT_AGG_RES, System.nanoTime() - startTime);
       }
 
       if (resultTsBlockBuilder.getPositionCount() > 0) {
@@ -216,8 +223,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
   }
 
   private boolean calcFromRawData(TsBlock tsBlock) {
+    long startTime = System.nanoTime();
     Pair<Boolean, TsBlock> calcResult =
         calculateAggregationFromRawData(tsBlock, aggregators, curTimeRange, ascending);
+    operatorContext.addOperatorTime(CAL_AGG_FROM_RAW_DATA, System.nanoTime() - startTime);
     inputTsBlock = calcResult.getRight();
     return calcResult.getLeft();
   }
@@ -233,75 +242,82 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   protected boolean readAndCalcFromFile() throws IOException {
     while (seriesScanUtil.hasNextFile()) {
-      if (canUseCurrentFileStatistics()) {
-        Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
-        if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
-          if (ascending) {
-            return true;
-          } else {
+      long startTime = System.nanoTime();
+      try {
+        if (canUseCurrentFileStatistics()) {
+          Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
+          if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+            if (ascending) {
+              return true;
+            } else {
+              seriesScanUtil.skipCurrentFile();
+              continue;
+            }
+          }
+          // calc from fileMetaData
+          if (curTimeRange.contains(
+              fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
+            Statistics[] statisticsList = new Statistics[subSensorSize];
+            for (int i = 0; i < subSensorSize; i++) {
+              statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
+            }
+            calcFromStatistics(statisticsList);
             seriesScanUtil.skipCurrentFile();
-            continue;
+            if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+              return true;
+            } else {
+              continue;
+            }
           }
         }
-        // calc from fileMetaData
-        if (curTimeRange.contains(
-            fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
-            statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
-          }
-          calcFromStatistics(statisticsList);
-          seriesScanUtil.skipCurrentFile();
-          if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
-            return true;
-          } else {
-            continue;
-          }
+        // read chunk
+        if (readAndCalcFromChunk()) {
+          return true;
         }
-      }
-
-      // read chunk
-      if (readAndCalcFromChunk()) {
-        return true;
+      } finally {
+        operatorContext.addOperatorTime(CAL_AGG_FROM_FILE, System.nanoTime() - startTime);
       }
     }
-
     return false;
   }
 
   protected boolean readAndCalcFromChunk() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
-      if (canUseCurrentChunkStatistics()) {
-        Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
-        if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
-          if (ascending) {
-            return true;
-          } else {
-            seriesScanUtil.skipCurrentChunk();
-            continue;
+      long startTime = System.nanoTime();
+      try {
+        if (canUseCurrentChunkStatistics()) {
+          Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
+          if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+            if (ascending) {
+              return true;
+            } else {
+              seriesScanUtil.skipCurrentChunk();
+              continue;
+            }
           }
-        }
-        // calc from chunkMetaData
-        if (curTimeRange.contains(
-            chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
           // calc from chunkMetaData
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
-            statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
-          }
-          calcFromStatistics(statisticsList);
-          seriesScanUtil.skipCurrentChunk();
-          if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
-            return true;
-          } else {
-            continue;
+          if (curTimeRange.contains(
+              chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
+            // calc from chunkMetaData
+            Statistics[] statisticsList = new Statistics[subSensorSize];
+            for (int i = 0; i < subSensorSize; i++) {
+              statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
+            }
+            calcFromStatistics(statisticsList);
+            seriesScanUtil.skipCurrentChunk();
+            if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+              return true;
+            } else {
+              continue;
+            }
           }
         }
-      }
-
-      // read page
-      if (readAndCalcFromPage()) {
-        return true;
+        // read page
+        if (readAndCalcFromPage()) {
+          return true;
+        }
+      } finally {
+        operatorContext.addOperatorTime(CAL_AGG_FROM_CHUNK, System.nanoTime() - startTime);
       }
     }
     return false;
@@ -309,43 +325,48 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   protected boolean readAndCalcFromPage() throws IOException {
     while (seriesScanUtil.hasNextPage()) {
-      if (canUseCurrentPageStatistics()) {
-        Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
-        // There is no more eligible points in current time range
-        if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
-          if (ascending) {
-            return true;
-          } else {
-            seriesScanUtil.skipCurrentPage();
-            continue;
-          }
-        }
-        // can use pageHeader
-        if (curTimeRange.contains(
-            pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
-          Statistics[] statisticsList = new Statistics[subSensorSize];
-          for (int i = 0; i < subSensorSize; i++) {
-            statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
+      long startTime = System.nanoTime();
+      try {
+        if (canUseCurrentPageStatistics()) {
+          Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics();
+          // There is no more eligible points in current time range
+          if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
+            if (ascending) {
+              return true;
+            } else {
+              seriesScanUtil.skipCurrentPage();
+              continue;
+            }
           }
-          calcFromStatistics(statisticsList);
-          seriesScanUtil.skipCurrentPage();
-          if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
-            return true;
-          } else {
-            continue;
+          // can use pageHeader
+          if (curTimeRange.contains(
+              pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
+            Statistics[] statisticsList = new Statistics[subSensorSize];
+            for (int i = 0; i < subSensorSize; i++) {
+              statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
+            }
+            calcFromStatistics(statisticsList);
+            seriesScanUtil.skipCurrentPage();
+            if (isAllAggregatorsHasFinalResult(aggregators) && !isGroupByQuery) {
+              return true;
+            } else {
+              continue;
+            }
           }
         }
-      }
 
-      // calc from page data
-      TsBlock tsBlock = seriesScanUtil.nextPage();
-      if (tsBlock == null || tsBlock.isEmpty()) {
-        continue;
-      }
+        // calc from page data
+        TsBlock tsBlock = seriesScanUtil.nextPage();
+        if (tsBlock == null || tsBlock.isEmpty()) {
+          continue;
+        }
 
-      // calc from raw data
-      if (calcFromRawData(tsBlock)) {
-        return true;
+        // calc from raw data
+        if (calcFromRawData(tsBlock)) {
+          return true;
+        }
+      } finally {
+        operatorContext.addOperatorTime(CAL_AGG_FROM_PAGE, System.nanoTime() - startTime);
       }
     }
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 5b6d2abc86..7cf7dd3fc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,6 +43,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ALLOC_EX_MEMORY;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CHECK_MEMORY;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.NODE_TO_OPERATOR;
+
 /**
  * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
  * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
@@ -49,6 +54,8 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalExecutionPlanner {
 
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class);
 
   /** allocated memory for operator execution */
@@ -69,13 +76,22 @@ public class LocalExecutionPlanner {
     LocalExecutionPlanContext context =
         new LocalExecutionPlanContext(types, instanceContext, dataRegion.getDataTTL());
 
+    long startTime = System.nanoTime();
     Operator root = plan.accept(new OperatorTreeGenerator(), context);
+    long endTime = System.nanoTime();
+    QUERY_STATISTICS.addCost(NODE_TO_OPERATOR, endTime - startTime);
 
+    startTime = endTime;
     // check whether current free memory is enough to execute current query
     checkMemory(root, instanceContext.getStateMachine());
+    endTime = System.nanoTime();
+    QUERY_STATISTICS.addCost(CHECK_MEMORY, endTime - startTime);
 
+    startTime = endTime;
     // calculate memory distribution of ISinkHandle/ISourceHandle
     setMemoryLimitForHandle(instanceContext.getId().toThrift(), plan);
+    endTime = System.nanoTime();
+    QUERY_STATISTICS.addCost(ALLOC_EX_MEMORY, endTime - startTime);
 
     ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
     instanceContext
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index 8a75b37eeb..0f22ada07b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -48,6 +48,16 @@ public class QueryStatistics {
 
   public static final String LOCAL_EXECUTION_PLANNER = "LocalExecutionPlanner";
 
+  public static final String CREATE_FI_CONTEXT = "CreateFIContext";
+
+  public static final String CREATE_FI_EXEC = "CreateFIExec";
+
+  public static final String NODE_TO_OPERATOR = "ToOpTree";
+
+  public static final String CHECK_MEMORY = "CheckMem";
+
+  public static final String ALLOC_EX_MEMORY = "AllocExchangeMem";
+
   public static final String QUERY_EXECUTION = "QueryExecution";
 
   public static final String QUERY_RESOURCE_INIT = "QueryResourceInit";
@@ -69,12 +79,27 @@ public class QueryStatistics {
 
   public static final String AGG_SCAN_OPERATOR = "AbstractSeriesAggregationScanOperator";
 
+  public static final String CAL_NEXT_AGG_RES = "CalcNextAggRes";
+
+  public static final String CAL_AGG_FROM_RAW_DATA = "CalcAggFromRawData";
+
+  public static final String CAL_AGG_FROM_PAGE = "CalcAggFromPage";
+
+  public static final String CAL_AGG_FROM_CHUNK = "CalcAggFromChunk";
+
+  public static final String CAL_AGG_FROM_FILE = "CalcAggFromFile";
+
   public static final String FILTER_AND_PROJECT_OPERATOR = "FilterAndProjectOperator";
 
   public static final String SINGLE_INPUT_AGG_OPERATOR = "SingleInputAggregationOperator";
 
   public static final String PAGE_READER = "IPageReader";
   public static final String PARSER = "Parser";
+
+  public static final String CREATE_QUERY_EXEC = "CreateQueryExec";
+
+  public static final String SERIALIZE_TSBLOCK = "SerTsBlock";
+
   public static final String ANALYZER = "Analyzer";
   public static final String SCHEMA_FETCHER = "SchemaFetcher";
   public static final String PARTITION_FETCHER = "PartitionFetcher";
@@ -92,6 +117,20 @@ public class QueryStatistics {
 
   public static final String SEND_TSBLOCK = "SendTsBlock";
 
+  public static final String RESERVE_MEMORY = "ReserveMem";
+
+  public static final String NOTIFY_NEW_TSBLOCK = "NotifyNewTsBlock";
+
+  public static final String NOTIFY_END = "NotifyEnd";
+
+  public static final String FREE_MEM = "FreeMem";
+
+  public static final String SINK_HANDLE_END_LISTENER = "SinkHandleEndListener";
+
+  public static final String SINK_HANDLE_FINISH_LISTENER = "SinkHandleFinishListener";
+
+  public static final String CHECK_AND_INVOKE_ON_FINISHED = "CheckAndInvokeOnFinished";
+
   public static final String SET_NO_MORE_TSBLOCK = "SetNoMoreTsBlock";
 
   public static final String SERVER_RPC_RT = "ServerRpcRT";
@@ -172,7 +211,7 @@ public class QueryStatistics {
           + "us"
           + ", totalCount="
           + count
-          + ", avgOperationTime="
+          + ", avgOpTime="
           + (time / count)
           + "us"
           + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 95d3dded30..3de3872503 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -164,8 +164,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
   private static final SelectResult OLD_SELECT_RESULT =
       (resp, queryExecution, fetchSize) -> {
+        long startTime = System.nanoTime();
         Pair<TSQueryDataSet, Boolean> pair =
             QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
+        QueryStatistics.getInstance()
+            .addCost(QueryStatistics.SERIALIZE_TSBLOCK, System.nanoTime() - startTime);
         resp.setQueryDataSet(pair.left);
         return pair.right;
       };
@@ -208,6 +211,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
 
       queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId);
+      long start = System.nanoTime();
       // create and cache dataset
       ExecutionResult result =
           COORDINATOR.execute(
@@ -218,6 +222,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               PARTITION_FETCHER,
               SCHEMA_FETCHER,
               req.getTimeout());
+      if (s.isQuery()) {
+        QueryStatistics.getInstance()
+            .addCost(QueryStatistics.CREATE_QUERY_EXEC, System.nanoTime() - start);
+      }
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index 3c9ee13e39..065807c27d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -62,7 +62,7 @@ public class LocalSourceHandleTest {
 
     // Local sink handle produces tsblocks.
     queue.add(Utils.createMockTsBlock(mockTsBlockSize));
-    queue.setNoMoreTsBlocks(true);
+    queue.setNoMoreTsBlocks();
     Assert.assertTrue(localSourceHandle.isBlocked().isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
     Assert.assertFalse(localSourceHandle.isFinished());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
index e5336d2ac2..eb3983230f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
@@ -120,7 +120,7 @@ public class SharedTsBlockQueueTest {
             executor);
       } else {
         synchronized (queue) {
-          queue.setNoMoreTsBlocks(true);
+          queue.setNoMoreTsBlocks();
         }
       }
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index 72ea2a9f92..726f112c61 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -81,13 +81,7 @@ public class GroupByFilter implements Filter, Serializable {
 
   @Override
   public boolean containStartEndTime(long startTime, long endTime) {
-    if (startTime >= this.startTime && endTime <= this.endTime) {
-      long minTime = startTime - this.startTime;
-      long maxTime = endTime - this.startTime;
-      long count = minTime / slidingStep;
-      return minTime <= interval + count * slidingStep && maxTime <= interval + count * slidingStep;
-    }
-    return false;
+    return startTime >= this.startTime && endTime <= this.endTime;
   }
 
   @Override