You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/20 01:15:50 UTC

[iotdb] 04/05: add metrics: driver scheduler

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8e103177816b1c45b1b415412c534a895a67bb89
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Dec 20 00:43:37 2022 +0800

    add metrics: driver scheduler
---
 .../iotdb/commons/service/metric/enums/Metric.java |  3 +-
 .../db/mpp/execution/schedule/DriverScheduler.java | 21 +++++++
 .../db/mpp/execution/schedule/task/DriverTask.java | 19 ++++++
 .../db/mpp/metric/DriverSchedulerMetricSet.java    | 71 ++++++++++++++++++++++
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   | 10 +++
 .../db/service/metrics/DataNodeMetricsHelper.java  |  8 +++
 6 files changed, 131 insertions(+), 1 deletion(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index e96be1f12e..5e809a83dd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -70,7 +70,8 @@ public enum Metric {
   QUERY_EXECUTION,
   AGGREGATION,
   QUERY_RESOURCE,
-  DATA_EXCHANGE;
+  DATA_EXCHANGE,
+  DRIVER_SCHEDULER;
 
   @Override
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index a93f3ec69c..8684d159af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskID;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
@@ -50,10 +51,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
+import static org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
+
 /** the manager of fragment instances scheduling */
 public class DriverScheduler implements IDriverScheduler, IService {
 
   private static final Logger logger = LoggerFactory.getLogger(DriverScheduler.class);
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
   public static DriverScheduler getInstance() {
     return InstanceHolder.instance;
@@ -266,6 +271,14 @@ public class DriverScheduler implements IDriverScheduler, IService {
     return scheduler;
   }
 
+  public long getReadyQueueTaskCount() {
+    return readyQueue.size();
+  }
+
+  public long getBlockQueueTaskCount() {
+    return blockedTasks.size();
+  }
+
   @TestOnly
   IndexedBlockingQueue<DriverTask> getReadyQueue() {
     return readyQueue;
@@ -306,7 +319,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
         if (task.getStatus() != DriverTaskStatus.BLOCKED) {
           return;
         }
+
         task.setStatus(DriverTaskStatus.READY);
+        QUERY_METRICS.recordTaskQueueTime(
+            BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
         readyQueue.push(task);
         blockedTasks.remove(task);
       } finally {
@@ -321,7 +337,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
         if (task.getStatus() != DriverTaskStatus.READY) {
           return false;
         }
+
         task.setStatus(DriverTaskStatus.RUNNING);
+        QUERY_METRICS.recordTaskQueueTime(
+            READY_QUEUED_TIME, System.nanoTime() - task.getLastEnterReadyQueueTime());
       } finally {
         task.unlock();
       }
@@ -337,6 +356,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.READY);
+        task.setLastEnterReadyQueueTime(System.nanoTime());
         readyQueue.push(task);
       } finally {
         task.unlock();
@@ -352,6 +372,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.BLOCKED);
+        task.setLastEnterBlockQueueTime(System.nanoTime());
         blockedTasks.add(task);
       } finally {
         task.unlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index f65c1f3988..469159beaf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -53,6 +53,9 @@ public class DriverTask implements IDIndexedAccessible {
 
   private String abortCause;
 
+  private long lastEnterReadyQueueTime;
+  private long lastEnterBlockQueueTime;
+
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null);
@@ -146,6 +149,22 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
+  public long getLastEnterReadyQueueTime() {
+    return lastEnterReadyQueueTime;
+  }
+
+  public void setLastEnterReadyQueueTime(long lastEnterReadyQueueTime) {
+    this.lastEnterReadyQueueTime = lastEnterReadyQueueTime;
+  }
+
+  public long getLastEnterBlockQueueTime() {
+    return lastEnterBlockQueueTime;
+  }
+
+  public void setLastEnterBlockQueueTime(long lastEnterBlockQueueTime) {
+    this.lastEnterBlockQueueTime = lastEnterBlockQueueTime;
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements Comparator<DriverTask> {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/DriverSchedulerMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DriverSchedulerMetricSet.java
new file mode 100644
index 0000000000..bc73518df8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DriverSchedulerMetricSet.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class DriverSchedulerMetricSet implements IMetricSet {
+
+  private static final String metric = Metric.DRIVER_SCHEDULER.toString();
+
+  public static final String READY_QUEUED_TIME = "ready_queued_time";
+  public static final String BLOCK_QUEUED_TIME = "block_queued_time";
+
+  public static final String READY_QUEUE_TASK_COUNT = "ready_queue_task_count";
+  public static final String BLOCK_QUEUE_TASK_COUNT = "block_queue_task_count";
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    metricService.getOrCreateTimer(
+        metric, MetricLevel.IMPORTANT, Tag.NAME.toString(), READY_QUEUED_TIME);
+    metricService.getOrCreateTimer(
+        metric, MetricLevel.IMPORTANT, Tag.NAME.toString(), BLOCK_QUEUED_TIME);
+    metricService.createAutoGauge(
+        metric,
+        MetricLevel.IMPORTANT,
+        DriverScheduler.getInstance(),
+        DriverScheduler::getReadyQueueTaskCount,
+        Tag.NAME.toString(),
+        READY_QUEUE_TASK_COUNT);
+    metricService.createAutoGauge(
+        metric,
+        MetricLevel.IMPORTANT,
+        DriverScheduler.getInstance(),
+        DriverScheduler::getBlockQueueTaskCount,
+        Tag.NAME.toString(),
+        BLOCK_QUEUE_TASK_COUNT);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(MetricType.TIMER, metric, Tag.NAME.toString(), READY_QUEUED_TIME);
+    metricService.remove(MetricType.TIMER, metric, Tag.NAME.toString(), BLOCK_QUEUED_TIME);
+    metricService.remove(
+        MetricType.AUTO_GAUGE, metric, Tag.NAME.toString(), READY_QUEUE_TASK_COUNT);
+    metricService.remove(
+        MetricType.AUTO_GAUGE, metric, Tag.NAME.toString(), BLOCK_QUEUE_TASK_COUNT);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index a90f9b7241..10810a0d05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -106,6 +106,16 @@ public class QueryMetricsManager {
         GET_DATA_BLOCK_NUM);
   }
 
+  public void recordTaskQueueTime(String name, long queueTimeInNanos) {
+    metricService.timer(
+        queueTimeInNanos,
+        TimeUnit.NANOSECONDS,
+        Metric.DRIVER_SCHEDULER.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.NAME.toString(),
+        name);
+  }
+
   public static QueryMetricsManager getInstance() {
     return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 22096f8986..76f89f55a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -20,7 +20,11 @@
 package org.apache.iotdb.db.service.metrics;
 
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet;
+import org.apache.iotdb.db.mpp.metric.DriverSchedulerMetricSet;
+import org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.mpp.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
 import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
@@ -37,5 +41,9 @@ public class DataNodeMetricsHelper {
     // bind query related metrics
     MetricService.getInstance().addMetricSet(new QueryPlanCostMetricSet());
     MetricService.getInstance().addMetricSet(new SeriesScanCostMetricSet());
+    MetricService.getInstance().addMetricSet(new QueryExecutionMetricSet());
+    MetricService.getInstance().addMetricSet(new QueryResourceMetricSet());
+    MetricService.getInstance().addMetricSet(new DataExchangeMetricSet());
+    MetricService.getInstance().addMetricSet(new DriverSchedulerMetricSet());
   }
 }