You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/24 07:05:49 UTC

[incubator-iotdb] 02/02: add cli query flush info

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 9816b20310afd071225ec9f9afc18124e2df4fa4
Author: lta <li...@163.com>
AuthorDate: Wed Jul 24 12:26:23 2019 +0800

    add cli query flush info
---
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |  2 ++
 .../apache/iotdb/db/engine/flush/FlushManager.java |  1 -
 .../db/engine/flush/pool/AbstractPoolManager.java  |  5 +++-
 .../engine/flush/pool/FlushSubTaskPoolManager.java |  4 +--
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  2 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++++++++++++---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  4 ++-
 7 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index a4779af..383a350 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -57,6 +57,8 @@ public class IoTDBConstant {
   // for cluster, set read consistency level
   public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
 
+  public static final String SHOW_FLUSH_TASK_INFO = "show\\s+flush\\s+task\\s+info";
+
   public static final String ROLE = "Role";
   public static final String USER = "User";
   public static final String PRIVILEGE = "Privilege";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index f996c18..ff5136f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -94,5 +94,4 @@ public class FlushManager implements IService {
 
     private static FlushManager instance = new FlushManager();
   }
-
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index cd11ec1..d1a3dcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.exception.StartupException;
 import org.slf4j.Logger;
 
 public abstract class AbstractPoolManager {
@@ -70,6 +69,10 @@ public abstract class AbstractPoolManager {
     return ((ThreadPoolExecutor) pool).getQueue().size();
   }
 
+  public int getTotalTasks() {
+    return getActiveCnt() + getWaitingTasksNumber();
+  }
+
   public int getCorePoolSize() {
     return ((ThreadPoolExecutor) pool).getCorePoolSize();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 448fb49..94eff1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -55,7 +55,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
       this.pool = IoTDBThreadPoolFactory
           .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
     }
-    LOGGER.info("Flush encoding sub task manager started.");
+    LOGGER.info("Flush sub task manager started.");
   }
 
   @Override
@@ -64,7 +64,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
       close();
       pool = null;
     }
-    LOGGER.info("Flush encoding sub task manager stopped");
+    LOGGER.info("Flush sub task manager stopped");
   }
 
   private static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0c70887..1c9b2a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -91,7 +91,6 @@ public class IoTDB implements IoTDBMBean {
     }
 
     initMManager();
-    registerManager.register(FlushManager.getInstance());
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
     registerManager.register(JMXService.getInstance());
@@ -101,6 +100,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(Measurement.INSTANCE);
     registerManager.register(SyncServerManager.getInstance());
     registerManager.register(TVListAllocator.getInstance());
+    registerManager.register(FlushManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 06e857b..28b40de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.exception.ArgsErrorException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -411,7 +412,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
-    String currStmt = null;
     List<Integer> result = new ArrayList<>();
     try {
       if (!checkLogin()) {
@@ -425,7 +425,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       for (String statement : statements) {
         long t2 = System.currentTimeMillis();
-        currStmt = statement;
         isAllSuccessful =
             isAllSuccessful && executeStatementInBatch(statement, batchErrorMessage, result);
         Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
@@ -491,8 +490,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS");
       }
 
+      if (execShowFlushInfo(statement)) {
+        String msg = String.format(
+            "There are %d flush tasks, %d flush tasks are in execution and %d flush tasks are waiting for execution.",
+            FlushTaskPoolManager.getInstance().getTotalTasks(),
+            FlushTaskPoolManager.getInstance().getActiveCnt(),
+            FlushTaskPoolManager.getInstance().getWaitingTasksNumber());
+        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, msg);
+      }
+
       if (execSetConsistencyLevel(statement)) {
-        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS,
             "Execute set consistency level successfully");
       }
 
@@ -516,6 +524,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   /**
    * Set consistency level
    */
+  private boolean execShowFlushInfo(String statement) {
+    if (statement == null) {
+      return false;
+    }
+    statement = statement.toLowerCase().trim();
+    if (Pattern.matches(IoTDBConstant.SHOW_FLUSH_TASK_INFO, statement)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Set consistency level
+   */
   private boolean execSetConsistencyLevel(String statement) throws SQLException {
     if (statement == null) {
       return false;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 78a846b..3a6a255 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
 import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
+import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -160,11 +161,12 @@ public class EnvironmentUtils {
     }
     StorageEngine.getInstance().reset();
     MultiFileLogNodeManager.getInstance().start();
+    FlushManager.getInstance().start();
     TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
   }
 
-  private static void createAllDir() throws IOException {
+  private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
       createDir(path);