You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/02/19 12:16:40 UTC
[iotdb] 01/01: fix show query processlist
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch showquery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10c1bee5db7cf00d6d2fba1a6c7a51b257072c0b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 19 20:15:41 2022 +0800
fix show query processlist
---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 45 +++++++++++-----------
1 file changed, 23 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 431e687..bd26ab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -147,6 +147,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -748,17 +749,7 @@ public class TSServiceImpl implements TSIService.Iface {
statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
if (physicalPlan.isQuery()) {
- Future<TSExecuteStatementResp> resp =
- QueryTaskManager.getInstance()
- .submit(
- new QueryTask(
- physicalPlan,
- sessionManager.getUsername(req.sessionId),
- req.statement,
- req.statementId,
- req.timeout,
- req.fetchSize,
- req.enableRedirectQuery));
+ Future<TSExecuteStatementResp> resp = submitQueryTask(physicalPlan, req);
return resp.get();
} else {
return executeUpdateStatement(physicalPlan, req.getSessionId());
@@ -788,17 +779,7 @@ public class TSServiceImpl implements TSIService.Iface {
statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
if (physicalPlan.isQuery()) {
- Future<TSExecuteStatementResp> resp =
- QueryTaskManager.getInstance()
- .submit(
- new QueryTask(
- physicalPlan,
- sessionManager.getUsername(req.sessionId),
- req.statement,
- req.statementId,
- req.timeout,
- req.fetchSize,
- req.enableRedirectQuery));
+ Future<TSExecuteStatementResp> resp = submitQueryTask(physicalPlan, req);
return resp.get();
} else {
return RpcUtils.getTSExecuteStatementResp(
@@ -852,6 +833,26 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
+ private Future<TSExecuteStatementResp> submitQueryTask(
+ PhysicalPlan physicalPlan, TSExecuteStatementReq req) {
+ QueryTask queryTask =
+ new QueryTask(
+ physicalPlan,
+ sessionManager.getUsername(req.sessionId),
+ req.statement,
+ req.statementId,
+ req.timeout,
+ req.fetchSize,
+ req.enableRedirectQuery);
+ Future<TSExecuteStatementResp> resp;
+ if (physicalPlan instanceof ShowQueryProcesslistPlan) {
+ resp = Executors.newFixedThreadPool(1).submit(queryTask);
+ } else {
+ resp = QueryTaskManager.getInstance().submit(queryTask);
+ }
+ return resp;
+ }
+
/** Redirect query */
private TSExecuteStatementResp redirectQueryToAnotherNode(
TSExecuteStatementResp resp, long queryId, String ip, int port) {