You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/24 06:16:15 UTC

[incubator-iotdb] branch manage_flush_pool updated (3df5bff -> fd7f5b6)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


 discard 3df5bff  add cli query flush info
     new fd7f5b6  add cli query flush info

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3df5bff)
            \
             N -- N -- N   refs/heads/manage_flush_pool (fd7f5b6)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 server/iotdb/data/system/schema/system.properties |   3 ---
 server/iotdb/data/system/users/root.profile       | Bin 50 -> 0 bytes
 2 files changed, 3 deletions(-)
 delete mode 100644 server/iotdb/data/system/schema/system.properties
 delete mode 100644 server/iotdb/data/system/users/root.profile


[incubator-iotdb] 01/01: add cli query flush info

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit fd7f5b692adfb359342399397d4b84e294511eb0
Author: lta <li...@163.com>
AuthorDate: Wed Jul 24 12:26:23 2019 +0800

    add cli query flush info
---
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |  2 ++
 .../apache/iotdb/db/engine/flush/FlushManager.java |  1 -
 .../db/engine/flush/pool/AbstractPoolManager.java  |  5 +++-
 .../engine/flush/pool/FlushSubTaskPoolManager.java |  4 +--
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  2 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++++++++++++---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  4 ++-
 7 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index a4779af..383a350 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -57,6 +57,8 @@ public class IoTDBConstant {
   // for cluster, set read consistency level
   public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
 
+  public static final String SHOW_FLUSH_TASK_INFO = "show\\s+flush\\s+task\\s+info";
+
   public static final String ROLE = "Role";
   public static final String USER = "User";
   public static final String PRIVILEGE = "Privilege";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index f996c18..ff5136f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -94,5 +94,4 @@ public class FlushManager implements IService {
 
     private static FlushManager instance = new FlushManager();
   }
-
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index cd11ec1..d1a3dcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.exception.StartupException;
 import org.slf4j.Logger;
 
 public abstract class AbstractPoolManager {
@@ -70,6 +69,10 @@ public abstract class AbstractPoolManager {
     return ((ThreadPoolExecutor) pool).getQueue().size();
   }
 
+  public int getTotalTasks() {
+    return getActiveCnt() + getWaitingTasksNumber();
+  }
+
   public int getCorePoolSize() {
     return ((ThreadPoolExecutor) pool).getCorePoolSize();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 448fb49..94eff1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -55,7 +55,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
       this.pool = IoTDBThreadPoolFactory
           .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
     }
-    LOGGER.info("Flush encoding sub task manager started.");
+    LOGGER.info("Flush sub task manager started.");
   }
 
   @Override
@@ -64,7 +64,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
       close();
       pool = null;
     }
-    LOGGER.info("Flush encoding sub task manager stopped");
+    LOGGER.info("Flush sub task manager stopped");
   }
 
   private static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0c70887..1c9b2a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -91,7 +91,6 @@ public class IoTDB implements IoTDBMBean {
     }
 
     initMManager();
-    registerManager.register(FlushManager.getInstance());
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(MultiFileLogNodeManager.getInstance());
     registerManager.register(JMXService.getInstance());
@@ -101,6 +100,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(Measurement.INSTANCE);
     registerManager.register(SyncServerManager.getInstance());
     registerManager.register(TVListAllocator.getInstance());
+    registerManager.register(FlushManager.getInstance());
 
     JMXService.registerMBean(getInstance(), mbeanName);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 06e857b..28b40de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.exception.ArgsErrorException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -411,7 +412,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
-    String currStmt = null;
     List<Integer> result = new ArrayList<>();
     try {
       if (!checkLogin()) {
@@ -425,7 +425,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       for (String statement : statements) {
         long t2 = System.currentTimeMillis();
-        currStmt = statement;
         isAllSuccessful =
             isAllSuccessful && executeStatementInBatch(statement, batchErrorMessage, result);
         Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
@@ -491,8 +490,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS");
       }
 
+      if (execShowFlushInfo(statement)) {
+        String msg = String.format(
+            "There are %d flush tasks, %d flush tasks are in execution and %d flush tasks are waiting for execution.",
+            FlushTaskPoolManager.getInstance().getTotalTasks(),
+            FlushTaskPoolManager.getInstance().getActiveCnt(),
+            FlushTaskPoolManager.getInstance().getWaitingTasksNumber());
+        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, msg);
+      }
+
       if (execSetConsistencyLevel(statement)) {
-        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+        return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS,
             "Execute set consistency level successfully");
       }
 
@@ -516,6 +524,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   /**
    * Set consistency level
    */
+  private boolean execShowFlushInfo(String statement) {
+    if (statement == null) {
+      return false;
+    }
+    statement = statement.toLowerCase().trim();
+    if (Pattern.matches(IoTDBConstant.SHOW_FLUSH_TASK_INFO, statement)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Set consistency level
+   */
   private boolean execSetConsistencyLevel(String statement) throws SQLException {
     if (statement == null) {
       return false;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 78a846b..3a6a255 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
 import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
+import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -160,11 +161,12 @@ public class EnvironmentUtils {
     }
     StorageEngine.getInstance().reset();
     MultiFileLogNodeManager.getInstance().start();
+    FlushManager.getInstance().start();
     TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
   }
 
-  private static void createAllDir() throws IOException {
+  private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
       createDir(path);