You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/02/19 12:16:40 UTC

[iotdb] 01/01: fix show query processlist

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch showquery
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 10c1bee5db7cf00d6d2fba1a6c7a51b257072c0b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sat Feb 19 20:15:41 2022 +0800

    fix show query processlist
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 45 +++++++++++-----------
 1 file changed, 23 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 431e687..bd26ab1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -147,6 +147,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -748,17 +749,7 @@ public class TSServiceImpl implements TSIService.Iface {
               statement, sessionManager.getZoneId(req.getSessionId()), req.fetchSize);
 
       if (physicalPlan.isQuery()) {
-        Future<TSExecuteStatementResp> resp =
-            QueryTaskManager.getInstance()
-                .submit(
-                    new QueryTask(
-                        physicalPlan,
-                        sessionManager.getUsername(req.sessionId),
-                        req.statement,
-                        req.statementId,
-                        req.timeout,
-                        req.fetchSize,
-                        req.enableRedirectQuery));
+        Future<TSExecuteStatementResp> resp = submitQueryTask(physicalPlan, req);
         return resp.get();
       } else {
         return executeUpdateStatement(physicalPlan, req.getSessionId());
@@ -788,17 +779,7 @@ public class TSServiceImpl implements TSIService.Iface {
               statement, sessionManager.getZoneId(req.sessionId), req.fetchSize);
 
       if (physicalPlan.isQuery()) {
-        Future<TSExecuteStatementResp> resp =
-            QueryTaskManager.getInstance()
-                .submit(
-                    new QueryTask(
-                        physicalPlan,
-                        sessionManager.getUsername(req.sessionId),
-                        req.statement,
-                        req.statementId,
-                        req.timeout,
-                        req.fetchSize,
-                        req.enableRedirectQuery));
+        Future<TSExecuteStatementResp> resp = submitQueryTask(physicalPlan, req);
         return resp.get();
       } else {
         return RpcUtils.getTSExecuteStatementResp(
@@ -852,6 +833,26 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  private Future<TSExecuteStatementResp> submitQueryTask(
+      PhysicalPlan physicalPlan, TSExecuteStatementReq req) {
+    QueryTask queryTask =
+        new QueryTask(
+            physicalPlan,
+            sessionManager.getUsername(req.sessionId),
+            req.statement,
+            req.statementId,
+            req.timeout,
+            req.fetchSize,
+            req.enableRedirectQuery);
+    Future<TSExecuteStatementResp> resp;
+    if (physicalPlan instanceof ShowQueryProcesslistPlan) {
+      resp = Executors.newFixedThreadPool(1).submit(queryTask);
+    } else {
+      resp = QueryTaskManager.getInstance().submit(queryTask);
+    }
+    return resp;
+  }
+
   /** Redirect query */
   private TSExecuteStatementResp redirectQueryToAnotherNode(
       TSExecuteStatementResp resp, long queryId, String ip, int port) {