You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/09 09:52:05 UTC

[iotdb] branch QueryMetrics updated: Add more statistics

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/QueryMetrics by this push:
     new 3b48aec470 Add more statistics
3b48aec470 is described below

commit 3b48aec470280eede08f1c31f7b3f0ee030c6515
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Nov 9 17:51:55 2022 +0800

    Add more statistics
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  | 41 +++++++++++++---------
 .../mpp/execution/exchange/LocalSourceHandle.java  | 11 ++++++
 .../db/mpp/execution/exchange/SourceHandle.java    | 15 +++++++-
 .../db/mpp/plan/execution/QueryExecution.java      | 14 ++++++--
 .../iotdb/db/mpp/statistics/QueryStatistics.java   | 13 +++++++
 5 files changed, 75 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index ba6909e4fa..29850eaa38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -39,6 +39,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.QUERY_RESOURCE_INIT;
+
 /**
  * One dataDriver is responsible for one FragmentInstance which is for data query, which may
  * contains several series.
@@ -96,23 +98,30 @@ public class DataDriver extends Driver {
    * we should change all the blocked lock operation into tryLock
    */
   private void initialize() throws QueryProcessException {
-    List<DataSourceOperator> sourceOperators =
-        ((DataDriverContext) driverContext).getSourceOperators();
-    if (sourceOperators != null && !sourceOperators.isEmpty()) {
-      QueryDataSource dataSource = initQueryDataSource();
-      sourceOperators.forEach(
-          sourceOperator -> {
-            // construct QueryDataSource for source operator
-            QueryDataSource queryDataSource =
-                new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
-
-            queryDataSource.setDataTTL(dataSource.getDataTTL());
-
-            sourceOperator.initQueryDataSource(queryDataSource);
-          });
-    }
+    long startTime = System.nanoTime();
+    try {
+      List<DataSourceOperator> sourceOperators =
+          ((DataDriverContext) driverContext).getSourceOperators();
+      if (sourceOperators != null && !sourceOperators.isEmpty()) {
+        QueryDataSource dataSource = initQueryDataSource();
+        sourceOperators.forEach(
+            sourceOperator -> {
+              // construct QueryDataSource for source operator
+              QueryDataSource queryDataSource =
+                  new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+
+              queryDataSource.setDataTTL(dataSource.getDataTTL());
+
+              sourceOperator.initQueryDataSource(queryDataSource);
+            });
+      }
 
-    this.init = true;
+      this.init = true;
+    } finally {
+      driverContext
+          .getFragmentInstanceContext()
+          .addOperationTime(QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+    }
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 4db83c9b88..a77d147552 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -36,11 +37,15 @@ import java.nio.ByteBuffer;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_GET_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.LOCAL_SOURCE_HANDLE_SER_TSBLOCK;
 
 public class LocalSourceHandle implements ISourceHandle {
 
   private static final Logger logger = LoggerFactory.getLogger(LocalSourceHandle.class);
 
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
   private final String localPlanNodeId;
@@ -88,6 +93,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public TsBlock receive() {
+    long startTime = System.nanoTime();
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       checkState();
 
@@ -107,6 +113,8 @@ public class LocalSourceHandle implements ISourceHandle {
       }
       checkAndInvokeOnFinished();
       return tsBlock;
+    } finally {
+      QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_GET_TSBLOCK, System.nanoTime() - startTime);
     }
   }
 
@@ -114,10 +122,13 @@ public class LocalSourceHandle implements ISourceHandle {
   public ByteBuffer getSerializedTsBlock() throws IoTDBException {
     TsBlock tsBlock = receive();
     if (tsBlock != null) {
+      long startTime = System.nanoTime();
       try {
         return serde.serialize(tsBlock);
       } catch (Exception e) {
         throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+      } finally {
+        QUERY_STATISTICS.addCost(LOCAL_SOURCE_HANDLE_SER_TSBLOCK, System.nanoTime() - startTime);
       }
     } else {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 27e4cb9cd3..bcb41a97ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.statistics.QueryStatistics;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -49,11 +50,15 @@ import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.REMOTE_SOURCE_HANDLE_DESER_TSBLOCK;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.REMOTE_SOURCE_HANDLE_GET_TSBLOCK;
 
 public class SourceHandle implements ISourceHandle {
 
   private static final Logger logger = LoggerFactory.getLogger(SourceHandle.class);
 
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   public static final int MAX_ATTEMPT_TIMES = 3;
   private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
 
@@ -118,7 +123,12 @@ public class SourceHandle implements ISourceHandle {
   public synchronized TsBlock receive() {
     ByteBuffer tsBlock = getSerializedTsBlock();
     if (tsBlock != null) {
-      return serde.deserialize(tsBlock);
+      long startTime = System.nanoTime();
+      try {
+        return serde.deserialize(tsBlock);
+      } finally {
+        QUERY_STATISTICS.addCost(REMOTE_SOURCE_HANDLE_DESER_TSBLOCK, System.nanoTime() - startTime);
+      }
     } else {
       return null;
     }
@@ -126,6 +136,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized ByteBuffer getSerializedTsBlock() {
+    long startTime = System.nanoTime();
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
 
       checkState();
@@ -153,6 +164,8 @@ public class SourceHandle implements ISourceHandle {
       }
       trySubmitGetDataBlocksTask();
       return tsBlock;
+    } finally {
+      QUERY_STATISTICS.addCost(REMOTE_SOURCE_HANDLE_GET_TSBLOCK, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index adc7c17f7f..ac737c77dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -83,6 +83,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
+import static org.apache.iotdb.db.mpp.statistics.QueryStatistics.WAIT_FOR_RESULT;
 
 /**
  * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
@@ -94,6 +95,9 @@ public class QueryExecution implements IQueryExecution {
   private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class);
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final QueryStatistics QUERY_STATISTICS = QueryStatistics.getInstance();
+
   private static final int MAX_RETRY_COUNT = 3;
   private static final long RETRY_INTERVAL_IN_MS = 2000;
   private int retryCount = 0;
@@ -381,8 +385,14 @@ public class QueryExecution implements IQueryExecution {
           return Optional.empty();
         }
 
-        ListenableFuture<?> blocked = resultHandle.isBlocked();
-        blocked.get();
+        long startTime = System.nanoTime();
+        try {
+          ListenableFuture<?> blocked = resultHandle.isBlocked();
+          blocked.get();
+        } finally {
+          QUERY_STATISTICS.addCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);
+        }
+
         if (!resultHandle.isFinished()) {
           // use the getSerializedTsBlock instead of receive to get ByteBuffer result
           T res = dataSupplier.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
index 55db559d19..31a1a611cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/QueryStatistics.java
@@ -50,6 +50,19 @@ public class QueryStatistics {
 
   public static final String QUERY_EXECUTION = "QueryExecution";
 
+  public static final String QUERY_RESOURCE_INIT = "QueryResourceInit";
+
+  public static final String LOCAL_SOURCE_HANDLE_GET_TSBLOCK = "LocalSourceHandleGetTsBlock";
+
+  public static final String LOCAL_SOURCE_HANDLE_SER_TSBLOCK = "LocalSourceHandleSerializeTsBlock";
+
+  public static final String REMOTE_SOURCE_HANDLE_GET_TSBLOCK = "RemoteSourceHandleGetTsBlock";
+
+  public static final String REMOTE_SOURCE_HANDLE_DESER_TSBLOCK =
+      "RemoteSourceHandleDeserializeTsBlock";
+
+  public static final String WAIT_FOR_RESULT = "WaitForResult";
+
   public static final String SERIES_SCAN_OPERATOR = "SeriesScanOperator";
 
   public static final String ALIGNED_SERIES_SCAN_OPERATOR = "AlignedSeriesScanOperator";