You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/24 06:16:16 UTC
[incubator-iotdb] 01/01: add cli query flush info
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fd7f5b692adfb359342399397d4b84e294511eb0
Author: lta <li...@163.com>
AuthorDate: Wed Jul 24 12:26:23 2019 +0800
add cli query flush info
---
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 2 ++
.../apache/iotdb/db/engine/flush/FlushManager.java | 1 -
.../db/engine/flush/pool/AbstractPoolManager.java | 5 +++-
.../engine/flush/pool/FlushSubTaskPoolManager.java | 4 +--
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++++++++++++---
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 ++-
7 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index a4779af..383a350 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -57,6 +57,8 @@ public class IoTDBConstant {
// for cluster, set read consistency level
public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*";
+ public static final String SHOW_FLUSH_TASK_INFO = "show\\s+flush\\s+task\\s+info";
+
public static final String ROLE = "Role";
public static final String USER = "User";
public static final String PRIVILEGE = "Privilege";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index f996c18..ff5136f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -94,5 +94,4 @@ public class FlushManager implements IService {
private static FlushManager instance = new FlushManager();
}
-
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index cd11ec1..d1a3dcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.exception.StartupException;
import org.slf4j.Logger;
public abstract class AbstractPoolManager {
@@ -70,6 +69,10 @@ public abstract class AbstractPoolManager {
return ((ThreadPoolExecutor) pool).getQueue().size();
}
+ public int getTotalTasks() {
+ return getActiveCnt() + getWaitingTasksNumber();
+ }
+
public int getCorePoolSize() {
return ((ThreadPoolExecutor) pool).getCorePoolSize();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 448fb49..94eff1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -55,7 +55,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
this.pool = IoTDBThreadPoolFactory
.newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
}
- LOGGER.info("Flush encoding sub task manager started.");
+ LOGGER.info("Flush sub task manager started.");
}
@Override
@@ -64,7 +64,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager {
close();
pool = null;
}
- LOGGER.info("Flush encoding sub task manager stopped");
+ LOGGER.info("Flush sub task manager stopped");
}
private static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0c70887..1c9b2a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -91,7 +91,6 @@ public class IoTDB implements IoTDBMBean {
}
initMManager();
- registerManager.register(FlushManager.getInstance());
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(JMXService.getInstance());
@@ -101,6 +100,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(Measurement.INSTANCE);
registerManager.register(SyncServerManager.getInstance());
registerManager.register(TVListAllocator.getInstance());
+ registerManager.register(FlushManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 06e857b..28b40de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -411,7 +412,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
- String currStmt = null;
List<Integer> result = new ArrayList<>();
try {
if (!checkLogin()) {
@@ -425,7 +425,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
for (String statement : statements) {
long t2 = System.currentTimeMillis();
- currStmt = statement;
isAllSuccessful =
isAllSuccessful && executeStatementInBatch(statement, batchErrorMessage, result);
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2);
@@ -491,8 +490,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS");
}
+ if (execShowFlushInfo(statement)) {
+ String msg = String.format(
+ "There are %d flush tasks, %d flush tasks are in execution and %d flush tasks are waiting for execution.",
+ FlushTaskPoolManager.getInstance().getTotalTasks(),
+ FlushTaskPoolManager.getInstance().getActiveCnt(),
+ FlushTaskPoolManager.getInstance().getWaitingTasksNumber());
+ return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, msg);
+ }
+
if (execSetConsistencyLevel(statement)) {
- return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+ return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS,
"Execute set consistency level successfully");
}
@@ -516,6 +524,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
/**
* Set consistency level
*/
+ private boolean execShowFlushInfo(String statement) {
+ if (statement == null) {
+ return false;
+ }
+ statement = statement.toLowerCase().trim();
+ if (Pattern.matches(IoTDBConstant.SHOW_FLUSH_TASK_INFO, statement)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Set consistency level
+ */
private boolean execSetConsistencyLevel(String statement) throws SQLException {
if (statement == null) {
return false;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 78a846b..3a6a255 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
+import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
@@ -160,11 +161,12 @@ public class EnvironmentUtils {
}
StorageEngine.getInstance().reset();
MultiFileLogNodeManager.getInstance().start();
+ FlushManager.getInstance().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
}
- private static void createAllDir() throws IOException {
+ private static void createAllDir() {
// create sequential files
for (String path : directoryManager.getAllSequenceFileFolders()) {
createDir(path);