You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/07 06:22:49 UTC

[iotdb] 01/04: add QueryStatistics

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch QueryMetrics0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1f87529758b2a02f88a89eeeb5cbe16774a7315d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 7 13:48:35 2022 +0800

    add QueryStatistics
---
 server/src/assembly/resources/conf/logback.xml     |  20 +++
 .../threadpool/ScheduledExecutorUtil.java          | 196 +++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   1 +
 .../iotdb/db/query/control/QueryStatistics.java    | 130 ++++++++++++++
 4 files changed, 347 insertions(+)

diff --git a/server/src/assembly/resources/conf/logback.xml b/server/src/assembly/resources/conf/logback.xml
index 7349c2b0b2..310053054d 100644
--- a/server/src/assembly/resources/conf/logback.xml
+++ b/server/src/assembly/resources/conf/logback.xml
@@ -265,6 +265,23 @@
             <level>INFO</level>
         </filter>
     </appender>
+    <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="QUERY_STATISTICS">
+        <file>${IOTDB_LOG_DIR}/log_query_statistics.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <fileNamePattern>${IOTDB_LOG_DIR}/log-query-statistics-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
+            <maxFileSize>10MB</maxFileSize>
+            <maxHistory>168</maxHistory>
+            <totalSizeCap>512MB</totalSizeCap>
+        </rollingPolicy>
+        <append>true</append>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
+            <charset>utf-8</charset>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
     <root level="info">
         <appender-ref ref="FILETRACE"/>
         <appender-ref ref="FILEDEBUG"/>
@@ -299,4 +316,7 @@
     <logger level="info" name="COMPACTION">
         <appender-ref ref="COMPACTION"/>
     </logger>
+    <logger level="info" name="QUERY_STATISTICS">
+        <appender-ref ref="QUERY_STATISTICS"/>
+    </logger>
 </configuration>
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java
new file mode 100644
index 0000000000..950947331d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduledExecutorUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(ScheduledExecutorUtil.class);
+
+  /**
+   * A safe wrapper method to make sure the exception thrown by the previous running will not affect
+   * the next one. Please reference the javadoc of {@link
+   * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for more details.
+   *
+   * @param executor the ScheduledExecutorService instance.
+   * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param initialDelay same parameter in {@link
+   *     ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+   * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   */
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> safelyScheduleAtFixedRate(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long period,
+      TimeUnit unit) {
+    return scheduleAtFixedRate(executor, command, initialDelay, period, unit, false);
+  }
+
+  /**
+   * A safe wrapper method to make sure the exception thrown by the previous running will not affect
+   * the next one. Please reference the javadoc of {@link
+   * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} for more
+   * details.
+   *
+   * @param executor the ScheduledExecutorService instance.
+   * @param command same parameter in {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   * @param initialDelay same parameter in {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+   *     long, long, TimeUnit)}.
+   * @return the same return value of {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   */
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> safelyScheduleWithFixedDelay(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long delay,
+      TimeUnit unit) {
+    return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, false);
+  }
+
+  /**
+   * A wrapper method to have the same semantic with {@link
+   * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, except for
+   * logging an error log when any uncaught exception happens.
+   *
+   * @param executor the ScheduledExecutorService instance.
+   * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param initialDelay same parameter in {@link
+   *     ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+   * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+   *     long, long, TimeUnit)}.
+   */
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> unsafelyScheduleAtFixedRate(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long period,
+      TimeUnit unit) {
+    return scheduleAtFixedRate(executor, command, initialDelay, period, unit, true);
+  }
+
+  /**
+   * A wrapper method to have the same semantic with {@link
+   * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, except for
+   * logging an error log when any uncaught exception happens.
+   *
+   * @param executor the ScheduledExecutorService instance.
+   * @param command same parameter in {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   * @param initialDelay same parameter in {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+   *     long, long, TimeUnit)}.
+   * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+   *     long, long, TimeUnit)}.
+   * @return the same return value of {@link
+   *     ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+   */
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> unsafelyScheduleWithFixedDelay(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long delay,
+      TimeUnit unit) {
+    return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, true);
+  }
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  private static ScheduledFuture<?> scheduleAtFixedRate(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long period,
+      TimeUnit unit,
+      boolean unsafe) {
+    return executor.scheduleAtFixedRate(
+        () -> {
+          try {
+            command.run();
+          } catch (Throwable t) {
+            logger.error("Schedule task failed", t);
+            if (unsafe) {
+              throw t;
+            }
+          }
+        },
+        initialDelay,
+        period,
+        unit);
+  }
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  private static ScheduledFuture<?> scheduleWithFixedDelay(
+      ScheduledExecutorService executor,
+      Runnable command,
+      long initialDelay,
+      long delay,
+      TimeUnit unit,
+      boolean unsafe) {
+    return executor.scheduleWithFixedDelay(
+        () -> {
+          try {
+            command.run();
+          } catch (Throwable t) {
+            logger.error("Schedule task failed", t);
+            if (unsafe) {
+              throw t;
+            }
+          }
+        },
+        initialDelay,
+        delay,
+        unit);
+  }
+
+  public static RuntimeException propagate(Throwable throwable) {
+    logger.error("Run thread failed", throwable);
+    Throwables.throwIfUnchecked(throwable);
+    throw new RuntimeException(throwable);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 6d09cb4ac6..49059934f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -40,6 +40,7 @@ public class IoTDBConstant {
   public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
   public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
   public static final String DOUBLE_LIVE_LOGGER_NAME = "DOUBLE_LIVE";
+  public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS";
 
   public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java
new file mode 100644
index 0000000000..3ce09646f0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class QueryStatistics {
+
+  private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 10_000;
+
+  private static final Logger QUERY_STATISTICS_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME);
+
+  private final AtomicBoolean tracing = new AtomicBoolean(false);
+
+  private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>();
+
+  public static final String PARSER = "Parser";
+  public static final String PLANNER = "Planner";
+
+  private QueryStatistics() {
+    ScheduledExecutorService scheduledExecutor =
+        IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        scheduledExecutor,
+        this::printQueryStatistics,
+        0,
+        QUERY_STATISTICS_PRINT_INTERVAL_IN_MS,
+        TimeUnit.MILLISECONDS);
+  }
+
+  private void printQueryStatistics() {
+    if (tracing.get()) {
+      operationStatistics.forEach(
+          (k, v) -> {
+            QUERY_STATISTICS_LOGGER.info("Operation: {}, Statistics: {}", k, v);
+          });
+    }
+  }
+
+  public static QueryStatistics getInstance() {
+    return QueryStatisticsHolder.INSTANCE;
+  }
+
+  public void addCost(String key, long costTimeInNanos) {
+    if (tracing.get()) {
+      operationStatistics
+          .computeIfAbsent(key, k -> new OperationStatistic())
+          .addTimeCost(costTimeInNanos);
+    }
+  }
+
+  public void disableTracing() {
+    tracing.set(false);
+    operationStatistics.clear();
+  }
+
+  public void enableTracing() {
+    tracing.set(true);
+    operationStatistics.clear();
+  }
+
+  private static class OperationStatistic {
+    // accumulated operation time in ns
+    private final AtomicLong totalTime;
+    private final AtomicLong totalCount;
+
+    public OperationStatistic() {
+      this.totalTime = new AtomicLong(0);
+      this.totalCount = new AtomicLong(0);
+    }
+
+    public void addTimeCost(long costTimeInNanos) {
+      totalTime.addAndGet(costTimeInNanos);
+      totalCount.incrementAndGet();
+    }
+
+    @Override
+    public String toString() {
+      long time = totalTime.get() / 1_000;
+      long count = totalCount.get();
+      return "{"
+          + "totalTime="
+          + time
+          + "us"
+          + ", totalCount="
+          + count
+          + ", avgOperationTime="
+          + (time / count)
+          + "us"
+          + '}';
+    }
+  }
+
+  private static class QueryStatisticsHolder {
+
+    private static final QueryStatistics INSTANCE = new QueryStatistics();
+
+    private QueryStatisticsHolder() {}
+  }
+}