You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/07 06:22:49 UTC
[iotdb] 01/04: add QueryStatistics
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch QueryMetrics0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1f87529758b2a02f88a89eeeb5cbe16774a7315d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 7 13:48:35 2022 +0800
add QueryStatistics
---
server/src/assembly/resources/conf/logback.xml | 20 +++
.../threadpool/ScheduledExecutorUtil.java | 196 +++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
.../iotdb/db/query/control/QueryStatistics.java | 130 ++++++++++++++
4 files changed, 347 insertions(+)
diff --git a/server/src/assembly/resources/conf/logback.xml b/server/src/assembly/resources/conf/logback.xml
index 7349c2b0b2..310053054d 100644
--- a/server/src/assembly/resources/conf/logback.xml
+++ b/server/src/assembly/resources/conf/logback.xml
@@ -265,6 +265,23 @@
<level>INFO</level>
</filter>
</appender>
+ <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="QUERY_STATISTICS">
+ <file>${IOTDB_LOG_DIR}/log_query_statistics.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <fileNamePattern>${IOTDB_LOG_DIR}/log-query-statistics-%d{yyyyMMdd}.%i.log.gz</fileNamePattern>
+ <maxFileSize>10MB</maxFileSize>
+ <maxHistory>168</maxHistory>
+ <totalSizeCap>512MB</totalSizeCap>
+ </rollingPolicy>
+ <append>true</append>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
<root level="info">
<appender-ref ref="FILETRACE"/>
<appender-ref ref="FILEDEBUG"/>
@@ -299,4 +316,7 @@
<logger level="info" name="COMPACTION">
<appender-ref ref="COMPACTION"/>
</logger>
+ <logger level="info" name="QUERY_STATISTICS">
+ <appender-ref ref="QUERY_STATISTICS"/>
+ </logger>
</configuration>
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java
new file mode 100644
index 0000000000..950947331d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/ScheduledExecutorUtil.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduledExecutorUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledExecutorUtil.class);
+
+ /**
+ * A safe wrapper method to make sure the exception thrown by the previous running will not affect
+ * the next one. Please reference the javadoc of {@link
+ * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for more details.
+ *
+ * @param executor the ScheduledExecutorService instance.
+ * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @param initialDelay same parameter in {@link
+ * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+ * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ */
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> safelyScheduleAtFixedRate(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit) {
+ return scheduleAtFixedRate(executor, command, initialDelay, period, unit, false);
+ }
+
+ /**
+ * A safe wrapper method to make sure the exception thrown by the previous running will not affect
+ * the next one. Please reference the javadoc of {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} for more
+ * details.
+ *
+ * @param executor the ScheduledExecutorService instance.
+ * @param command same parameter in {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ * @param initialDelay same parameter in {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+ * long, long, TimeUnit)}.
+ * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+ * long, long, TimeUnit)}.
+ * @return the same return value of {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ */
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> safelyScheduleWithFixedDelay(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit) {
+ return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, false);
+ }
+
+ /**
+ * A wrapper method to have the same semantic with {@link
+ * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, except for
+ * logging an error log when any uncaught exception happens.
+ *
+ * @param executor the ScheduledExecutorService instance.
+ * @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @param initialDelay same parameter in {@link
+ * ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+ * @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ * @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
+ * long, long, TimeUnit)}.
+ */
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> unsafelyScheduleAtFixedRate(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit) {
+ return scheduleAtFixedRate(executor, command, initialDelay, period, unit, true);
+ }
+
+ /**
+ * A wrapper method to have the same semantic with {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, except for
+ * logging an error log when any uncaught exception happens.
+ *
+ * @param executor the ScheduledExecutorService instance.
+ * @param command same parameter in {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ * @param initialDelay same parameter in {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ * @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+ * long, long, TimeUnit)}.
+ * @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
+ * long, long, TimeUnit)}.
+ * @return the same return value of {@link
+ * ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+ */
+ @SuppressWarnings("unsafeThreadSchedule")
+ public static ScheduledFuture<?> unsafelyScheduleWithFixedDelay(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit) {
+ return scheduleWithFixedDelay(executor, command, initialDelay, delay, unit, true);
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ private static ScheduledFuture<?> scheduleAtFixedRate(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit,
+ boolean unsafe) {
+ return executor.scheduleAtFixedRate(
+ () -> {
+ try {
+ command.run();
+ } catch (Throwable t) {
+ logger.error("Schedule task failed", t);
+ if (unsafe) {
+ throw t;
+ }
+ }
+ },
+ initialDelay,
+ period,
+ unit);
+ }
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ private static ScheduledFuture<?> scheduleWithFixedDelay(
+ ScheduledExecutorService executor,
+ Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit,
+ boolean unsafe) {
+ return executor.scheduleWithFixedDelay(
+ () -> {
+ try {
+ command.run();
+ } catch (Throwable t) {
+ logger.error("Schedule task failed", t);
+ if (unsafe) {
+ throw t;
+ }
+ }
+ },
+ initialDelay,
+ delay,
+ unit);
+ }
+
+ public static RuntimeException propagate(Throwable throwable) {
+ logger.error("Run thread failed", throwable);
+ Throwables.throwIfUnchecked(throwable);
+ throw new RuntimeException(throwable);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 6d09cb4ac6..49059934f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -40,6 +40,7 @@ public class IoTDBConstant {
public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
public static final String DOUBLE_LIVE_LOGGER_NAME = "DOUBLE_LIVE";
+ public static final String QUERY_STATISTICS_LOGGER_NAME = "QUERY_STATISTICS";
public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java
new file mode 100644
index 0000000000..3ce09646f0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryStatistics.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.control;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class QueryStatistics {
+
+ private static final long QUERY_STATISTICS_PRINT_INTERVAL_IN_MS = 10_000;
+
+ private static final Logger QUERY_STATISTICS_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.QUERY_STATISTICS_LOGGER_NAME);
+
+ private final AtomicBoolean tracing = new AtomicBoolean(false);
+
+ private final Map<String, OperationStatistic> operationStatistics = new ConcurrentHashMap<>();
+
+ public static final String PARSER = "Parser";
+ public static final String PLANNER = "Planner";
+
+ private QueryStatistics() {
+ ScheduledExecutorService scheduledExecutor =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(1, "Query-Statistics-Print");
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ scheduledExecutor,
+ this::printQueryStatistics,
+ 0,
+ QUERY_STATISTICS_PRINT_INTERVAL_IN_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void printQueryStatistics() {
+ if (tracing.get()) {
+ operationStatistics.forEach(
+ (k, v) -> {
+ QUERY_STATISTICS_LOGGER.info("Operation: {}, Statistics: {}", k, v);
+ });
+ }
+ }
+
+ public static QueryStatistics getInstance() {
+ return QueryStatisticsHolder.INSTANCE;
+ }
+
+ public void addCost(String key, long costTimeInNanos) {
+ if (tracing.get()) {
+ operationStatistics
+ .computeIfAbsent(key, k -> new OperationStatistic())
+ .addTimeCost(costTimeInNanos);
+ }
+ }
+
+ public void disableTracing() {
+ tracing.set(false);
+ operationStatistics.clear();
+ }
+
+ public void enableTracing() {
+ tracing.set(true);
+ operationStatistics.clear();
+ }
+
+ private static class OperationStatistic {
+ // accumulated operation time in ns
+ private final AtomicLong totalTime;
+ private final AtomicLong totalCount;
+
+ public OperationStatistic() {
+ this.totalTime = new AtomicLong(0);
+ this.totalCount = new AtomicLong(0);
+ }
+
+ public void addTimeCost(long costTimeInNanos) {
+ totalTime.addAndGet(costTimeInNanos);
+ totalCount.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ long time = totalTime.get() / 1_000;
+ long count = totalCount.get();
+ return "{"
+ + "totalTime="
+ + time
+ + "us"
+ + ", totalCount="
+ + count
+ + ", avgOperationTime="
+ + (time / count)
+ + "us"
+ + '}';
+ }
+ }
+
+ private static class QueryStatisticsHolder {
+
+ private static final QueryStatistics INSTANCE = new QueryStatistics();
+
+ private QueryStatisticsHolder() {}
+ }
+}