You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/31 09:15:34 UTC

[iotdb] branch cpu-monitor updated (031d0a168fb -> fe815fc559b)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch cpu-monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 031d0a168fb refactor getModuleTheThreadBelongs
     new 4e560892db4 edit according to review
     new fe815fc559b edit according to review

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../metrics/metricsets/cpu/CpuUsageMetrics.java    |  1 +
 .../commons/concurrent/IoTDBThreadPoolFactory.java | 43 +++++++++++---
 .../iotdb/commons/concurrent/ThreadName.java       |  5 +-
 .../commons/concurrent/ThreadPoolMetrics.java      | 16 ++++++
 .../iotdb/commons/concurrent/WrappedCallable.java  | 17 ------
 .../iotdb/commons/concurrent/WrappedRunnable.java  | 18 ------
 .../WrappedSingleThreadExecutorService.java        | 61 ++++++--------------
 .../WrappedSingleThreadScheduledExecutor.java      | 67 ++++++++--------------
 .../db/service/metrics/DataNodeMetricsHelper.java  | 29 +++++-----
 .../iotdb/db/service/metrics/ProcessMetrics.java   |  4 +-
 10 files changed, 116 insertions(+), 145 deletions(-)


[iotdb] 01/02: edit according to review

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch cpu-monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4e560892db479faa667e5287803dc2cf39998792
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Wed May 31 17:11:38 2023 +0800

    edit according to review
---
 .../commons/concurrent/IoTDBThreadPoolFactory.java | 43 +++++++++++---
 .../iotdb/commons/concurrent/ThreadName.java       |  5 +-
 .../iotdb/commons/concurrent/WrappedCallable.java  | 17 ------
 .../iotdb/commons/concurrent/WrappedRunnable.java  | 18 ------
 .../WrappedSingleThreadExecutorService.java        | 61 ++++++--------------
 .../WrappedSingleThreadScheduledExecutor.java      | 67 ++++++++--------------
 .../db/service/metrics/DataNodeMetricsHelper.java  | 29 +++++-----
 .../iotdb/db/service/metrics/ProcessMetrics.java   |  4 +-
 8 files changed, 99 insertions(+), 145 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 2ecb29f42e7..d8325d9c921 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -32,7 +32,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -80,7 +82,14 @@ public class IoTDBThreadPoolFactory {
   public static ExecutorService newFixedThreadPoolWithDaemonThread(int nThreads, String poolName) {
     logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
     return new WrappedSingleThreadExecutorService(
-        Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName);
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new IoTThreadFactory(poolName)),
+        poolName);
   }
 
   public static ExecutorService newFixedThreadPool(
@@ -105,20 +114,41 @@ public class IoTDBThreadPoolFactory {
   public static ExecutorService newSingleThreadExecutor(String poolName) {
     logger.info(NEW_SINGLE_THREAD_POOL_LOGGER_FORMAT, poolName);
     return new WrappedSingleThreadExecutorService(
-        Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName);
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new IoTThreadFactory(poolName)),
+        poolName);
   }
 
   public static ExecutorService newSingleThreadExecutorWithDaemon(String poolName) {
     logger.info(NEW_SINGLE_THREAD_POOL_LOGGER_FORMAT, poolName);
     return new WrappedSingleThreadExecutorService(
-        Executors.newSingleThreadExecutor(new IoTDBDaemonThreadFactory(poolName)), poolName);
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new IoTThreadFactory(poolName)),
+        poolName);
   }
 
   public static ExecutorService newSingleThreadExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info(NEW_SINGLE_THREAD_POOL_LOGGER_FORMAT, poolName);
     return new WrappedSingleThreadExecutorService(
-        Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName, handler)), poolName);
+        new ThreadPoolExecutor(
+            1,
+            1,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new IoTThreadFactory(poolName)),
+        poolName);
   }
 
   /**
@@ -185,15 +215,14 @@ public class IoTDBThreadPoolFactory {
   public static ScheduledExecutorService newSingleThreadScheduledExecutor(String poolName) {
     logger.info(NEW_SINGLE_SCHEDULED_THREAD_POOL_LOGGER_FORMAT, poolName);
     return new WrappedSingleThreadScheduledExecutor(
-        Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory(poolName)), poolName);
+        new ScheduledThreadPoolExecutor(1, new IoTThreadFactory(poolName)), poolName);
   }
 
   public static ScheduledExecutorService newSingleThreadScheduledExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info(NEW_SINGLE_SCHEDULED_THREAD_POOL_LOGGER_FORMAT, poolName);
     return new WrappedSingleThreadScheduledExecutor(
-        Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory(poolName, handler)),
-        poolName);
+        new ScheduledThreadPoolExecutor(1, new IoTThreadFactory(poolName)), poolName);
   }
 
   /**
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index ca749f68d38..661944c2edc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -145,7 +145,8 @@ public enum ThreadName {
   STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
   PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
   PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
-  ;
+  // the unknown thread name is used for metrics
+  UNKOWN("UNKNOWN");
 
   private final String name;
   private static Set<ThreadName> queryThreadNames =
@@ -354,6 +355,6 @@ public enum ThreadName {
         }
       }
     }
-    return null;
+    return ThreadName.UNKOWN;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedCallable.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedCallable.java
index 2e6f52cdcfe..b4328316fb0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedCallable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedCallable.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.commons.concurrent;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** A wrapper for {@link Callable} logging errors when uncaught exception is thrown. */
 public abstract class WrappedCallable<V> implements Callable<V> {
@@ -48,20 +47,4 @@ public abstract class WrappedCallable<V> implements Callable<V> {
       }
     };
   }
-
-  public static <V> Callable<V> wrapWithCount(Callable<V> callable, AtomicInteger count) {
-    if (callable instanceof WrappedCallable) {
-      return callable;
-    }
-    return new WrappedCallable<V>() {
-      @Override
-      public V callMayThrow() throws Exception {
-        try {
-          return callable.call();
-        } finally {
-          count.incrementAndGet();
-        }
-      }
-    };
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedRunnable.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedRunnable.java
index 5198949bb55..9d15889c407 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedRunnable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/WrappedRunnable.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.commons.concurrent;
 
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 /** A wrapper for {@link Runnable} logging errors when uncaught exception is thrown. */
 public abstract class WrappedRunnable implements Runnable {
 
@@ -47,20 +45,4 @@ public abstract class WrappedRunnable implements Runnable {
       }
     };
   }
-
-  public static Runnable wrapWithCount(Runnable runnable, AtomicInteger count) {
-    if (runnable instanceof WrappedRunnable) {
-      return runnable;
-    }
-    return new WrappedRunnable() {
-      @Override
-      public void runMayThrow() {
-        try {
-          runnable.run();
-        } finally {
-          count.incrementAndGet();
-        }
-      }
-    };
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
index 80b76b526b5..eb522f6d4c9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
@@ -26,16 +26,15 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.service.JMXService;
 
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class WrappedSingleThreadExecutorService
@@ -43,8 +42,6 @@ public class WrappedSingleThreadExecutorService
   private final String mbeanName;
 
   ExecutorService service;
-  private final AtomicInteger taskCount = new AtomicInteger(0);
-  private final AtomicInteger runCount = new AtomicInteger(0);
 
   public WrappedSingleThreadExecutorService(ExecutorService service, String mbeanName) {
     this.service = service;
@@ -84,120 +81,100 @@ public class WrappedSingleThreadExecutorService
 
   @Override
   public <T> Future<T> submit(Callable<T> task) {
-    taskCount.incrementAndGet();
-    return service.submit(WrappedCallable.wrapWithCount(task, runCount));
+    return service.submit(WrappedCallable.wrap(task));
   }
 
   @Override
   public <T> Future<T> submit(Runnable task, T result) {
-    taskCount.incrementAndGet();
-    return service.submit(WrappedRunnable.wrapWithCount(task, runCount), result);
+    return service.submit(WrappedRunnable.wrap(task), result);
   }
 
   @Override
   public Future<?> submit(Runnable task) {
-    taskCount.incrementAndGet();
-    return service.submit(WrappedRunnable.wrapWithCount(task, runCount));
+    return service.submit(WrappedRunnable.wrap(task));
   }
 
   @Override
   public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException {
-    taskCount.addAndGet(tasks.size());
     return service.invokeAll(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()));
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()));
   }
 
   @Override
   public <T> List<Future<T>> invokeAll(
       Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
       throws InterruptedException {
-    taskCount.addAndGet(tasks.size());
     return service.invokeAll(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()),
-        timeout,
-        unit);
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()), timeout, unit);
   }
 
   @Override
   public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException {
-    taskCount.addAndGet(tasks.size());
     return service.invokeAny(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()));
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()));
   }
 
   @Override
   public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException {
-    taskCount.addAndGet(tasks.size());
     return service.invokeAny(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()),
-        timeout,
-        unit);
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()), timeout, unit);
   }
 
   @Override
   public void execute(Runnable command) {
-    taskCount.incrementAndGet();
-    service.execute(WrappedRunnable.wrapWithCount(command, runCount));
+    service.execute(WrappedRunnable.wrap(command));
   }
 
   @Override
   public int getCorePoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getCorePoolSize();
   }
 
   @Override
   public boolean prestartCoreThread() {
-    return false;
+    return ((ThreadPoolExecutor) service).prestartCoreThread();
   }
 
   @Override
   public int getMaximumPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getMaximumPoolSize();
   }
 
   @Override
   public Queue<Runnable> getQueue() {
-    return new LinkedList<>();
+    return ((ThreadPoolExecutor) service).getQueue();
   }
 
   @Override
   public int getQueueLength() {
-    return taskCount.get() - runCount.get();
+    return ((ThreadPoolExecutor) service).getQueue().size();
   }
 
   @Override
   public int getPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getPoolSize();
   }
 
   @Override
   public int getActiveCount() {
-    return taskCount.get() - runCount.get() > 0 ? 1 : 0;
+    return ((ThreadPoolExecutor) service).getActiveCount();
   }
 
   @Override
   public int getLargestPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getLargestPoolSize();
   }
 
   @Override
   public long getTaskCount() {
-    return taskCount.get();
+    return ((ThreadPoolExecutor) service).getTaskCount();
   }
 
   @Override
   public long getCompletedTaskCount() {
-    return runCount.get();
+    return ((ThreadPoolExecutor) service).getCompletedTaskCount();
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
index 77e36b569a5..ef9aa07c063 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.service.JMXService;
 
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Callable;
@@ -34,17 +33,15 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class WrappedSingleThreadScheduledExecutor
     implements ScheduledExecutorService, WrappedSingleThreadScheduledExecutorMBean {
   private final String mbeanName;
   ScheduledExecutorService service;
-  private final AtomicInteger taskCount = new AtomicInteger(0);
-  private final AtomicInteger runCount = new AtomicInteger(0);
 
   public WrappedSingleThreadScheduledExecutor(ScheduledExecutorService service, String mbeanName) {
     this.service = service;
@@ -57,32 +54,26 @@ public class WrappedSingleThreadScheduledExecutor
 
   @Override
   public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-    taskCount.incrementAndGet();
-    return service.schedule(WrappedRunnable.wrapWithCount(command, runCount), delay, unit);
+    return service.schedule(WrappedRunnable.wrap(command), delay, unit);
   }
 
   @Override
   public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-    taskCount.incrementAndGet();
-    return service.schedule(WrappedCallable.wrapWithCount(callable, runCount), delay, unit);
+    return service.schedule(WrappedCallable.wrap(callable), delay, unit);
   }
 
   @Override
   @SuppressWarnings("unsafeThreadSchedule")
   public ScheduledFuture<?> scheduleAtFixedRate(
       Runnable command, long initialDelay, long period, TimeUnit unit) {
-    taskCount.incrementAndGet();
-    return service.scheduleAtFixedRate(
-        WrappedRunnable.wrapWithCount(command, runCount), initialDelay, period, unit);
+    return service.scheduleAtFixedRate(WrappedRunnable.wrap(command), initialDelay, period, unit);
   }
 
   @Override
   @SuppressWarnings("unsafeThreadSchedule")
   public ScheduledFuture<?> scheduleWithFixedDelay(
       Runnable command, long initialDelay, long delay, TimeUnit unit) {
-    taskCount.incrementAndGet();
-    return service.scheduleWithFixedDelay(
-        WrappedRunnable.wrapWithCount(command, runCount), initialDelay, delay, unit);
+    return service.scheduleWithFixedDelay(WrappedRunnable.wrap(command), initialDelay, delay, unit);
   }
 
   @Override
@@ -114,26 +105,24 @@ public class WrappedSingleThreadScheduledExecutor
 
   @Override
   public <T> Future<T> submit(Callable<T> task) {
-    return service.submit(WrappedCallable.wrapWithCount(task, runCount));
+    return service.submit(WrappedCallable.wrap(task));
   }
 
   @Override
   public <T> Future<T> submit(Runnable task, T result) {
-    return service.submit(WrappedRunnable.wrapWithCount(task, runCount), result);
+    return service.submit(WrappedRunnable.wrap(task), result);
   }
 
   @Override
   public Future<?> submit(Runnable task) {
-    return service.submit(WrappedRunnable.wrapWithCount(task, runCount));
+    return service.submit(WrappedRunnable.wrap(task));
   }
 
   @Override
   public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException {
     return service.invokeAll(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()));
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()));
   }
 
   @Override
@@ -141,85 +130,75 @@ public class WrappedSingleThreadScheduledExecutor
       Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
       throws InterruptedException {
     return service.invokeAll(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()),
-        timeout,
-        unit);
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()), timeout, unit);
   }
 
   @Override
   public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException {
     return service.invokeAny(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()));
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()));
   }
 
   @Override
   public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException {
     return service.invokeAny(
-        tasks.stream()
-            .map(x -> WrappedCallable.wrapWithCount(x, runCount))
-            .collect(Collectors.toList()),
-        timeout,
-        unit);
+        tasks.stream().map(WrappedCallable::wrap).collect(Collectors.toList()), timeout, unit);
   }
 
   @Override
   public void execute(Runnable command) {
-    service.execute(WrappedRunnable.wrapWithCount(command, runCount));
+    service.execute(WrappedRunnable.wrap(command));
   }
 
   @Override
   public int getCorePoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getCorePoolSize();
   }
 
   @Override
   public boolean prestartCoreThread() {
-    return false;
+    return ((ThreadPoolExecutor) service).prestartCoreThread();
   }
 
   @Override
   public int getMaximumPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getMaximumPoolSize();
   }
 
   @Override
   public Queue<Runnable> getQueue() {
-    return new LinkedList<>();
+    return ((ThreadPoolExecutor) service).getQueue();
   }
 
   @Override
   public int getQueueLength() {
-    return 0;
+    return ((ThreadPoolExecutor) service).getQueue().size();
   }
 
   @Override
   public int getPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getPoolSize();
   }
 
   @Override
   public int getActiveCount() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getActiveCount();
   }
 
   @Override
   public int getLargestPoolSize() {
-    return 1;
+    return ((ThreadPoolExecutor) service).getLargestPoolSize();
   }
 
   @Override
   public long getTaskCount() {
-    return taskCount.get();
+    return ((ThreadPoolExecutor) service).getTaskCount();
   }
 
   @Override
   public long getCompletedTaskCount() {
-    return runCount.get();
+    return ((ThreadPoolExecutor) service).getCompletedTaskCount();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index b33f2fd5e52..910f500ea1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -57,20 +57,7 @@ public class DataNodeMetricsHelper {
     MetricService.getInstance().addMetricSet(new SystemMetrics(true));
     MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.DN_ROLE));
     MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.DN_ROLE));
-    List<String> threadModules = new ArrayList<>();
-    Arrays.stream(DataNodeThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
-    List<String> pools = new ArrayList<>();
-    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
-    MetricService.getInstance()
-        .addMetricSet(
-            new CpuUsageMetrics(
-                threadModules,
-                pools,
-                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
-                x -> {
-                  ThreadName pool = ThreadName.getThreadPoolTheThreadBelongs(x);
-                  return pool == null ? "UNKNOWN" : pool.name();
-                }));
+    initCpuMetrics();
     MetricService.getInstance().addMetricSet(WritingMetrics.getInstance());
 
     // bind query related metrics
@@ -86,4 +73,18 @@ public class DataNodeMetricsHelper {
     // bind performance overview related metrics
     MetricService.getInstance().addMetricSet(PerformanceOverviewMetrics.getInstance());
   }
+
+  private static void initCpuMetrics() {
+    List<String> threadModules = new ArrayList<>();
+    Arrays.stream(DataNodeThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
+    List<String> pools = new ArrayList<>();
+    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
+    MetricService.getInstance()
+        .addMetricSet(
+            new CpuUsageMetrics(
+                threadModules,
+                pools,
+                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
+                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/ProcessMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/ProcessMetrics.java
index 1decf866280..8fb8dd8d22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/ProcessMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/ProcessMetrics.java
@@ -34,7 +34,7 @@ public class ProcessMetrics implements IMetricSet {
   private final OperatingSystemMXBean sunOsMxBean;
   private final Runtime runtime;
   private long lastUpdateTime = 0L;
-  private static final long UPDATE_INTERVAL = 15_000L;
+  private static final long UPDATE_INTERVAL = 10_000L;
   private volatile long processCpuLoad = 0L;
   private volatile long processCpuTime = 0L;
 
@@ -69,6 +69,7 @@ public class ProcessMetrics implements IMetricSet {
           if (System.currentTimeMillis() - lastUpdateTime > UPDATE_INTERVAL) {
             lastUpdateTime = System.currentTimeMillis();
             processCpuLoad = (long) (sunOsMxBean.getProcessCpuLoad() * 100);
+            processCpuTime = sunOsMxBean.getProcessCpuTime();
           }
           return processCpuLoad;
         },
@@ -82,6 +83,7 @@ public class ProcessMetrics implements IMetricSet {
         bean -> {
           if (System.currentTimeMillis() - lastUpdateTime > UPDATE_INTERVAL) {
             lastUpdateTime = System.currentTimeMillis();
+            processCpuLoad = (long) (sunOsMxBean.getProcessCpuLoad() * 100);
             processCpuTime = sunOsMxBean.getProcessCpuTime();
           }
           return processCpuTime;


[iotdb] 02/02: edit according to review

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch cpu-monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe815fc559b503d5cecc1f5fc6aeafb52520c11d
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Wed May 31 17:15:22 2023 +0800

    edit according to review
---
 .../iotdb/metrics/metricsets/cpu/CpuUsageMetrics.java    |  1 +
 .../iotdb/commons/concurrent/ThreadPoolMetrics.java      | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)

diff --git a/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/cpu/CpuUsageMetrics.java b/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/cpu/CpuUsageMetrics.java
index 3ffabea3f79..b2896d1e130 100644
--- a/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/cpu/CpuUsageMetrics.java
+++ b/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/cpu/CpuUsageMetrics.java
@@ -199,6 +199,7 @@ public class CpuUsageMetrics implements IMetricSet {
         totalIncrementTime);
     lastThreadCpuTime.clear();
     lastThreadCpuTime.putAll(currentThreadCpuTime);
+    lastThreadUserTime.clear();
     lastThreadUserTime.putAll(currentThreadUserTime);
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
index 628d1309355..6e09d7153f1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
@@ -34,6 +34,8 @@ public class ThreadPoolMetrics implements IMetricSet {
   private static final String THREAD_POOL_DONE_TASK_COUNT = "thread_pool_done_task_count";
   private static final String THREAD_POOL_WAITING_TASK_COUNT = "thread_pool_waiting_task_count";
   private static final String THREAD_POOL_CORE_SIZE = "thread_pool_core_size";
+  private static final String THREAD_POOL_MAX_POOL_SIZE = "thread_pool_max_pool_size";
+  private static final String THREAD_POOL_LARGEST_POOL_SIZE = "thread_pool_largest_pool_size";
   private static final String POOL_NAME = "pool_name";
   private AbstractMetricService metricService;
   private Map<String, IThreadPoolMBean> notRegisteredPoolMap = new HashMap<>();
@@ -79,6 +81,20 @@ public class ThreadPoolMetrics implements IMetricSet {
             map -> registeredPoolMap.get(name).getCompletedTaskCount(),
             POOL_NAME,
             name);
+        metricService.createAutoGauge(
+            THREAD_POOL_MAX_POOL_SIZE,
+            MetricLevel.IMPORTANT,
+            registeredPoolMap,
+            map -> registeredPoolMap.get(name).getMaximumPoolSize(),
+            POOL_NAME,
+            name);
+        metricService.createAutoGauge(
+            THREAD_POOL_LARGEST_POOL_SIZE,
+            MetricLevel.IMPORTANT,
+            registeredPoolMap,
+            map -> registeredPoolMap.get(name).getLargestPoolSize(),
+            POOL_NAME,
+            name);
       }
     }
   }