You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/07/27 07:15:56 UTC

[incubator-iotdb] branch feature_add_flush_queue_jmx_interface created (now 21bd10c)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch feature_add_flush_queue_jmx_interface
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 21bd10c  add jmx interface for getting the active and pending tasks in FlushManager

This branch includes the following new commits:

     new 21bd10c  add jmx interface for getting the active and pending tasks in FlushManager

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: add jmx interface for getting the active and pending tasks in FlushManager

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_add_flush_queue_jmx_interface
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 21bd10cd537a37b7099ea7f9d89d39e53b4e644d
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Jul 27 15:15:40 2019 +0800

    add jmx interface for getting the active and pending tasks in FlushManager
---
 .../apache/iotdb/db/engine/flush/FlushManager.java | 32 +++++++++++++++++++++-
 .../iotdb/db/engine/flush/FlushManagerMBean.java   | 13 +++++++++
 .../db/engine/flush/pool/AbstractPoolManager.java  |  8 ++++++
 .../engine/flush/pool/FlushSubTaskPoolManager.java |  1 -
 .../db/engine/flush/pool/FlushTaskPoolManager.java |  1 +
 .../org/apache/iotdb/db/service/ServiceType.java   |  9 +++++-
 6 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index ff5136f..46e86d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -19,16 +19,18 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FlushManager implements IService {
+public class FlushManager implements FlushManagerMBean, IService {
 
   private static final Logger logger = LoggerFactory.getLogger(FlushManager.class);
 
@@ -40,6 +42,14 @@ public class FlushManager implements IService {
   public void start() throws StartupException {
     FlushSubTaskPoolManager.getInstance().start();
     FlushTaskPoolManager.getInstance().start();
+    try {
+      JMXService.registerMBean(this, ServiceType.FLUSH_SERVICE.getJmxName());
+    } catch (Exception e) {
+      String errorMessage = String
+          .format("Failed to start %s because of %s", this.getID().getName(),
+              e.getMessage());
+      throw new StartupException(errorMessage, e);
+    }
   }
 
   @Override
@@ -53,6 +63,26 @@ public class FlushManager implements IService {
     return ServiceType.FLUSH_SERVICE;
   }
 
+  @Override
+  public int getNumberOfWorkingTasks() {
+    return flushPool.getNumberOfWorkingTasks();
+  }
+
+  @Override
+  public int getNumberOfPendingTasks() {
+    return flushPool.getNumberOfPendingTasks();
+  }
+
+  @Override
+  public int getNumberOfWorkingSubTasks() {
+    return FlushSubTaskPoolManager.getInstance().getNumberOfWorkingTasks();
+  }
+
+  @Override
+  public int getNumberOfPendingSubTasks() {
+    return FlushSubTaskPoolManager.getInstance().getNumberOfPendingTasks();
+  }
+
   class FlushThread implements Runnable {
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java
new file mode 100644
index 0000000..7da3cc6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.db.engine.flush;
+
+public interface FlushManagerMBean {
+
+  public int getNumberOfWorkingTasks();
+
+  public int getNumberOfPendingTasks();
+
+  public int getNumberOfWorkingSubTasks();
+
+  public int getNumberOfPendingSubTasks();
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index d1a3dcc..8800bf9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -84,4 +84,12 @@ public abstract class AbstractPoolManager {
   public abstract void stop();
 
   public abstract String getName();
+
+  public int getNumberOfWorkingTasks() {
+    return ((ThreadPoolExecutor)pool).getActiveCount();
+  }
+
+  public int getNumberOfPendingTasks() {
+    return ((ThreadPoolExecutor)pool).getQueue().size();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 94eff1b..2264d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.flush.pool;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.service.IService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
index aec372c..0d02b2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
@@ -54,6 +54,7 @@ public class FlushTaskPoolManager extends AbstractPoolManager {
       pool = IoTDBThreadPoolFactory
           .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
     }
+
     LOGGER.info("Flush task manager started.");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index bff3e49..b2f245b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -33,7 +33,9 @@ public enum ServiceType {
   SYNC_SERVICE("SYNC ServerService", ""),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
-  FLUSH_SERVICE("Flush ServerService", "");
+
+  FLUSH_SERVICE("Flush ServerService",
+      generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager"));
 
   private String name;
   private String jmxName;
@@ -50,4 +52,9 @@ public enum ServiceType {
   public String getJmxName() {
     return jmxName;
   }
+
+  private static String generateJmxName(String packageName,  String jmxName) {
+    return String
+        .format("%s:type=%s", packageName, jmxName);
+  }
 }