You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/21 03:48:16 UTC

[iotdb] 01/04: Add more metrics

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 73b7ec32071286e7c9c072fbd67a8c646a73536a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Dec 20 19:50:49 2022 +0800

    Add more metrics
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  5 ++++
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  3 +-
 .../iotdb/db/mpp/execution/driver/DataDriver.java  | 33 +++++++++++++++-------
 .../AbstractSeriesAggregationScanOperator.java     | 20 +++++++++----
 .../execution/operator/source/SeriesScanUtil.java  |  4 ++-
 .../iotdb/db/mpp/statistics/QueryStatistics.java   |  9 ++++++
 .../iotdb/tsfile/read/filter/GroupByFilter.java    | 18 +-----------
 7 files changed, 57 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 353f546c92..5b5a88b0c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1745,11 +1745,16 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
+    boolean meet = false;
     for (TsFileResource tsFileResource : tsFileResources) {
       if (!tsFileResource.isSatisfied(
           singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
+        if (meet) {
+          break;
+        }
         continue;
       }
+      meet = true;
       closeQueryLock.readLock().lock();
       try {
         if (tsFileResource.isClosed()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index c893e4015f..dd2896556d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGGREGATOR_PROCESS_TSBLOCK;
 
 public class Aggregator {
 
@@ -80,7 +81,7 @@ public class Aggregator {
           Math.max(lastReadReadIndex, accumulator.addInput(controlTimeAndValueColumn, curWindow));
     }
     QueryStatistics.getInstance()
-        .addCost("AggregationScan: calcFromRawData", System.nanoTime() - startTime);
+        .addCost(AGGREGATOR_PROCESS_TSBLOCK, System.nanoTime() - startTime);
     return lastReadReadIndex;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index f766943236..893cdcd063 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -40,7 +40,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.ADD_REFERENCE;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.INIT_SOURCE_OP;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_INIT;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_LIST;
 
 /**
  * One dataDriver is responsible for one FragmentInstance which is for data query, which may
@@ -105,16 +108,19 @@ public class DataDriver extends Driver {
           ((DataDriverContext) driverContext).getSourceOperators();
       if (sourceOperators != null && !sourceOperators.isEmpty()) {
         QueryDataSource dataSource = initQueryDataSource();
-        sourceOperators.forEach(
-            sourceOperator -> {
-              // construct QueryDataSource for source operator
-              QueryDataSource queryDataSource =
-                  new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+        long start = System.nanoTime();
+        for (DataSourceOperator sourceOperator : sourceOperators) {
+          // construct QueryDataSource for source operator
+          QueryDataSource queryDataSource =
+              new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
 
-              queryDataSource.setDataTTL(dataSource.getDataTTL());
+          queryDataSource.setDataTTL(dataSource.getDataTTL());
 
-              sourceOperator.initQueryDataSource(queryDataSource);
-            });
+          sourceOperator.initQueryDataSource(queryDataSource);
+        }
+        driverContext
+            .getFragmentInstanceContext()
+            .addOperationTime(INIT_SOURCE_OP, System.nanoTime() - start);
       }
 
       this.init = true;
@@ -142,16 +148,24 @@ public class DataDriver extends Driver {
           pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
 
       Filter timeFilter = context.getTimeFilter();
+      long startTime = System.nanoTime();
       QueryDataSource dataSource =
           dataRegion.query(
               pathList,
               selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
               driverContext.getFragmentInstanceContext(),
               timeFilter != null ? timeFilter.copy() : null);
+      driverContext
+          .getFragmentInstanceContext()
+          .addOperationTime(QUERY_RESOURCE_LIST, System.nanoTime() - startTime);
 
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
+      startTime = System.nanoTime();
       addUsedFilesForQuery(dataSource);
+      driverContext
+          .getFragmentInstanceContext()
+          .addOperationTime(ADD_REFERENCE, System.nanoTime() - startTime);
 
       return dataSource;
     } finally {
@@ -196,8 +210,7 @@ public class DataDriver extends Driver {
    */
   private void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
     Set<TsFileResource> pathSet = isClosed ? closedFilePaths : unClosedFilePaths;
-    if (!pathSet.contains(tsFile)) {
-      pathSet.add(tsFile);
+    if (pathSet.add(tsFile)) {
       FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index aeaf19b3a1..d398966eae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -43,6 +43,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendA
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.AGG_SCAN_OPERATOR;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.BUILD_AGG_RES;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_CHUNK;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_FILE;
 import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.CAL_AGG_FROM_PAGE;
@@ -70,7 +71,9 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
   protected final List<Aggregator> aggregators;
 
   // using for building result tsBlock
-  protected final TsBlockBuilder resultTsBlockBuilder;
+  private TsBlockBuilder resultTsBlockBuilder;
+
+  private final List<TSDataType> dataTypes;
 
   protected boolean finished = false;
 
@@ -96,11 +99,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
     this.aggregators = aggregators;
     this.timeRangeIterator = timeRangeIterator;
 
-    List<TSDataType> dataTypes = new ArrayList<>();
+    this.dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
 
     this.maxRetainedSize =
         (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
@@ -144,6 +146,12 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
 
   @Override
   public TsBlock next() {
+    if (resultTsBlockBuilder == null) {
+      resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+    } else {
+      // lazy reset until next `next` call
+      resultTsBlockBuilder.reset();
+    }
     // start stopwatch
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
@@ -166,9 +174,7 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
       }
 
       if (resultTsBlockBuilder.getPositionCount() > 0) {
-        TsBlock resultTsBlock = resultTsBlockBuilder.build();
-        resultTsBlockBuilder.reset();
-        return resultTsBlock;
+        return resultTsBlockBuilder.build();
       } else {
         return null;
       }
@@ -214,8 +220,10 @@ public abstract class AbstractSeriesAggregationScanOperator implements DataSourc
   }
 
   protected void updateResultTsBlock() {
+    long startTime = System.nanoTime();
     appendAggregationResult(
         resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime());
+    operatorContext.addOperatorTime(BUILD_AGG_RES, System.nanoTime() - startTime);
   }
 
   protected boolean calcFromCachedData() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 160a794230..49ced2de96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -160,7 +160,9 @@ public class SeriesScanUtil {
   }
 
   public void initQueryDataSource(QueryDataSource dataSource) {
-    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    if (!dataSource.getUnseqResources().isEmpty()) {
+      QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), orderUtils.getAscending());
+    }
     this.dataSource = dataSource;
     this.timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
     if (this.valueFilter != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index 0f22ada07b..043dde1a72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -62,6 +62,11 @@ public class QueryStatistics {
 
   public static final String QUERY_RESOURCE_INIT = "QueryResourceInit";
 
+  public static final String INIT_SOURCE_OP = "InitSourceOp";
+
+  public static final String QUERY_RESOURCE_LIST = "TsFileList";
+  public static final String ADD_REFERENCE = "AddRef";
+
   public static final String LOCAL_SOURCE_HANDLE_GET_TSBLOCK = "LocalSourceHandleGetTsBlock";
 
   public static final String LOCAL_SOURCE_HANDLE_SER_TSBLOCK = "LocalSourceHandleSerializeTsBlock";
@@ -83,12 +88,16 @@ public class QueryStatistics {
 
   public static final String CAL_AGG_FROM_RAW_DATA = "CalcAggFromRawData";
 
+  public static final String AGGREGATOR_PROCESS_TSBLOCK = "AggProcTsBlock";
+
   public static final String CAL_AGG_FROM_PAGE = "CalcAggFromPage";
 
   public static final String CAL_AGG_FROM_CHUNK = "CalcAggFromChunk";
 
   public static final String CAL_AGG_FROM_FILE = "CalcAggFromFile";
 
+  public static final String BUILD_AGG_RES = "BuildAggRes";
+
   public static final String FILTER_AND_PROJECT_OPERATOR = "FilterAndProjectOperator";
 
   public static final String SINGLE_INPUT_AGG_OPERATOR = "SingleInputAggregationOperator";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index 726f112c61..2bd7c14c2d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -60,23 +60,7 @@ public class GroupByFilter implements Filter, Serializable {
 
   @Override
   public boolean satisfyStartEndTime(long startTime, long endTime) {
-    if (endTime < this.startTime || startTime >= this.endTime) {
-      return false;
-    } else if (startTime <= this.startTime) {
-      return true;
-    } else {
-      long minTime = startTime - this.startTime;
-      long count = minTime / slidingStep;
-      if (minTime <= interval + count * slidingStep) {
-        return true;
-      } else {
-        if (this.endTime <= (count + 1) * slidingStep + this.startTime) {
-          return false;
-        } else {
-          return endTime >= (count + 1) * slidingStep + this.startTime;
-        }
-      }
-    }
+    return startTime <= this.endTime && endTime >= this.startTime;
   }
 
   @Override