You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/19 08:39:20 UTC

[iotdb] branch master updated: [IOTDB-4816] Show Queries - support antlrParse & analyze process (#8467)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a59a2c914 [IOTDB-4816] Show Queries - support antlrParse & analyze process (#8467)
4a59a2c914 is described below

commit 4a59a2c914b06c197d9707fdc2965e83e9b31180
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Mon Dec 19 16:39:13 2022 +0800

    [IOTDB-4816] Show Queries - support antlrParse & analyze process (#8467)
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |  6 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   | 15 +++--
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  | 20 +++++-
 .../iotdb/confignode/manager/ConfigManager.java    | 13 ++++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 22 +++++++
 .../ConfigNodeClientManager.java}                  | 21 ++++--
 .../db/mpp/common/header/ColumnHeaderConstant.java | 12 ++++
 .../db/mpp/common/header/DatasetHeaderFactory.java |  4 ++
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 20 +++++-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 64 +++++++++++++++++++
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    | 69 ++++++++++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 52 ++++++++++++++-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  9 +++
 .../db/mpp/plan/statement/StatementVisitor.java    |  5 ++
 .../db/mpp/plan/statement/component/SortKey.java   |  6 +-
 .../plan/statement/sys/ShowQueriesStatement.java   | 74 ++++++++++++++++++++++
 .../src/main/thrift/confignode.thrift              |  8 +++
 19 files changed, 412 insertions(+), 17 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 3aea385462..b15b5d5b6f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -64,6 +64,7 @@ keyWords
     | DATA
     | DATABASE
     | DATABASES
+    | DATANODEID
     | DATANODES
     | DEACTIVATE
     | DEBUG
@@ -76,6 +77,7 @@ keyWords
     | DISABLE
     | DISCARD
     | DROP
+    | ELAPSEDTIME
     | END
     | ENDTIME
     | EVERY
@@ -137,6 +139,7 @@ keyWords
     | PRUNE
     | QUERIES
     | QUERY
+    | QUERYID
     | RANGE
     | READONLY
     | REGEXP
@@ -161,8 +164,9 @@ keyWords
     | STORAGE
     | START
     | STARTTIME
-    | STATELESS
     | STATEFUL
+    | STATELESS
+    | STATEMENT
     | STOP
     | SYSTEM
     | TAGS
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 3645cc78e1..f6599c0c7c 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -61,7 +61,7 @@ dclStatement
 utilityStatement
     : merge | fullMerge | flush | clearCache | settle | explain
     | setSystemStatus | showVersion | showFlushInfo | showLockInfo | showQueryResource
-    | showQueryProcesslist | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding
+    | showQueries | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding
     | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
 
 syncStatement
@@ -499,6 +499,10 @@ sortKey
     : TIME
     | TIMESERIES
     | DEVICE
+    | QUERYID
+    | DATANODEID
+    | ELAPSEDTIME
+    | STATEMENT
     ;
 
 // ---- Fill Clause
@@ -726,9 +730,12 @@ showQueryResource
     : SHOW QUERY RESOURCE
     ;
 
-// Show Query Processlist
-showQueryProcesslist
-    : SHOW QUERY PROCESSLIST
+// Show Queries / Show Query Processlist
+showQueries
+    : SHOW (QUERIES | QUERY PROCESSLIST)
+    whereClause?
+    orderByClause?
+    rowPaginationClause?
     ;
 
 // Kill Query
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 972ba814b9..94a5dc88cf 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -169,6 +169,10 @@ DATABASES
     : D A T A B A S E S
     ;
 
+DATANODEID
+    : D A T A N O D E I D
+    ;
+
 DATANODES
 
     : D A T A N O D E S
@@ -222,6 +226,10 @@ DROP
     : D R O P
     ;
 
+ELAPSEDTIME
+    : E L A P S E D T I M E
+    ;
+
 END
     : E N D
     ;
@@ -470,6 +478,10 @@ QUERY
     : Q U E R Y
     ;
 
+QUERYID
+    : Q U E R Y I D
+    ;
+
 RANGE
     : R A N G E
     ;
@@ -574,12 +586,16 @@ STARTTIME
     : S T A R T T I M E
     ;
 
+STATEFUL
+    : S T A T E F U L
+    ;
+
 STATELESS
     : S T A T E L E S S
     ;
 
-STATEFUL
-    : S T A T E F U L
+STATEMENT
+    : S T A T E M E N T
     ;
 
 STOP
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 0dc8e55e5c..ca668da22e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -105,6 +105,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
@@ -968,6 +969,18 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? new TGetDataNodeLocationsResp(
+            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+            nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
+                .map(TDataNodeConfiguration::getLocation)
+                .collect(Collectors.toList()))
+        : new TGetDataNodeLocationsResp(status, Collections.emptyList());
+  }
+
   @Override
   public TRegionRouteMapResp getLatestRegionRouteMap() {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d014273252..489e6acba3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
@@ -377,6 +378,8 @@ public interface IManager {
   /** TestOnly. Set the target DataNode to the specified status */
   TSStatus setDataNodeStatus(TSetDataNodeStatusReq req);
 
+  TGetDataNodeLocationsResp getRunningDataNodeLocations();
+
   /**
    * Get the latest RegionRouteMap
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 387a28e54a..f3553146d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
@@ -608,6 +609,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.setDataNodeStatus(req);
   }
 
+  @Override
+  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
+    return configManager.getRunningDataNodeLocations();
+  }
+
   @Override
   public TShowRegionResp showRegion(TShowRegionReq showRegionReq) {
     GetRegionInfoListPlan getRegionInfoListPlan = new GetRegionInfoListPlan(showRegionReq);
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 49886dda15..45b0607dfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
@@ -980,6 +981,27 @@ public class ConfigNodeClient
     throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
   }
 
+  @Override
+  public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetDataNodeLocationsResp resp = client.getRunningDataNodeLocations();
+        if (!updateConfigNodeLeader(resp.status)) {
+          return resp;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TShowRegionResp showRegion(TShowRegionReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
copy to server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
index e4a876c85a..12df1907bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,10 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.statement.component;
+package org.apache.iotdb.db.client;
 
-public enum SortKey {
-  TIME,
-  TIMESERIES,
-  DEVICE
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
+
+public class ConfigNodeClientManager {
+  private static final class ConfigNodeClientManagerHolder {
+    private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient> INSTANCE =
+        new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
+            .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+  }
+
+  public static IClientManager<ConfigNodeRegionId, ConfigNodeClient> getInstance() {
+    return ConfigNodeClientManagerHolder.INSTANCE;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index ae77933ddb..c8fffdb2e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -125,6 +125,11 @@ public class ColumnHeaderConstant {
   public static final String CQID = "CQId";
   public static final String QUERY = "Query";
 
+  // column names for show query processlist
+  public static final String QUERY_ID = "QueryId";
+  public static final String ELAPSED_TIME = "ElapsedTime";
+  public static final String STATEMENT = "Statement";
+
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(TIMESERIES, TSDataType.TEXT),
@@ -329,4 +334,11 @@ public class ColumnHeaderConstant {
           new ColumnHeader(CQID, TSDataType.TEXT),
           new ColumnHeader(QUERY, TSDataType.TEXT),
           new ColumnHeader(STATE, TSDataType.TEXT));
+
+  public static final List<ColumnHeader> showQueriesColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(QUERY_ID, TSDataType.TEXT),
+          new ColumnHeader(DATA_NODE_ID, TSDataType.INT32),
+          new ColumnHeader(ELAPSED_TIME, TSDataType.FLOAT),
+          new ColumnHeader(STATEMENT, TSDataType.TEXT));
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 00d4f7a4e5..fdc7349645 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -154,4 +154,8 @@ public class DatasetHeaderFactory {
   public static DatasetHeader getShowContinuousQueriesHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showContinuousQueriesColumnHeaders, true);
   }
+
+  public static DatasetHeader getShowQueriesHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showQueriesColumnHeaders, false);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 4b08a15181..58c362f38a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.analyze;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.partition.DataPartition;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -175,7 +177,7 @@ public class Analysis {
   // Schema Query Analysis
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
-  // extra mesaage from config node, used for node management
+  // extra message from config node, used for node management
   private Set<TSchemaNode> matchedNodes;
 
   // template and paths set template
@@ -187,6 +189,13 @@ public class Analysis {
   // generated by combine the input path pattern and template set path
   private List<PartialPath> specifiedTemplateRelatedPathPatternList;
 
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // Show Queries Analysis
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  // extra message from config node, queries wll be sent to these Running DataNodes
+  private List<TDataNodeLocation> runningDataNodeLocations;
+
   public Analysis() {
     this.finishQueryAfterAnalyze = false;
   }
@@ -261,6 +270,7 @@ public class Analysis {
   public boolean hasDataSource() {
     return (dataPartition != null && !dataPartition.isEmpty())
         || (schemaPartition != null && !schemaPartition.isEmpty())
+        || statement instanceof ShowQueriesStatement
         || (statement instanceof QueryStatement
             && ((QueryStatement) statement).isAggregationQuery());
   }
@@ -504,4 +514,12 @@ public class Analysis {
           tagValuesToGroupedTimeseriesOperands) {
     this.tagValuesToGroupedTimeseriesOperands = tagValuesToGroupedTimeseriesOperands;
   }
+
+  public List<TDataNodeLocation> getRunningDataNodeLocations() {
+    return runningDataNodeLocations;
+  }
+
+  public void setRunningDataNodeLocations(List<TDataNodeLocation> runningDataNodeLocations) {
+    this.runningDataNodeLocations = runningDataNodeLocations;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 8bc1b3fa45..31eaa3256b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -29,6 +30,10 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -42,6 +47,7 @@ import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
@@ -104,6 +110,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTempl
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
@@ -124,6 +131,7 @@ import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2475,4 +2483,60 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     analysis.setFinishQueryAfterAnalyze(true);
     return analysis;
   }
+
+  @Override
+  public Analysis visitShowQueries(
+      ShowQueriesStatement showQueriesStatement, MPPQueryContext context) {
+    Analysis analysis = new Analysis();
+    analysis.setStatement(showQueriesStatement);
+    analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowQueriesHeader());
+
+    List<TDataNodeLocation> allRunningDataNodeLocations = getRunningDataNodeLocations();
+    if (allRunningDataNodeLocations.isEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+    }
+    // TODO Constant folding optimization for Where Predicate after True/False Constant introduced
+    analysis.setRunningDataNodeLocations(allRunningDataNodeLocations);
+
+    analyzeWhere(analysis, showQueriesStatement);
+
+    return analysis;
+  }
+
+  private List<TDataNodeLocation> getRunningDataNodeLocations() {
+    try (ConfigNodeClient client =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      TGetDataNodeLocationsResp showDataNodesResp = client.getRunningDataNodeLocations();
+      if (showDataNodesResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new StatementAnalyzeException(
+            "An error occurred when executing getRunningDataNodeLocations():"
+                + showDataNodesResp.getStatus().getMessage());
+      }
+      return showDataNodesResp.getDataNodeLocationList();
+    } catch (TException | IOException e) {
+      throw new StatementAnalyzeException(
+          "An error occurred when executing getRunningDataNodeLocations():" + e.getMessage());
+    }
+  }
+
+  private void analyzeWhere(Analysis analysis, ShowQueriesStatement showQueriesStatement) {
+    WhereCondition whereCondition = showQueriesStatement.getWhereCondition();
+    if (whereCondition == null) {
+      return;
+    }
+
+    Expression whereExpression =
+        ExpressionAnalyzer.bindTypeForTimeSeriesOperand(
+            whereCondition.getPredicate(), ColumnHeaderConstant.showQueriesColumnHeaders);
+
+    TSDataType outputType = analyzeExpression(analysis, whereExpression);
+    if (outputType != TSDataType.BOOLEAN) {
+      throw new SemanticException(
+          String.format(
+              "The output type of the expression in WHERE clause should be BOOLEAN, actual data type: %s.",
+              outputType));
+    }
+
+    analysis.setWhereExpression(whereExpression);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index cd8143ea08..91410320f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
@@ -1224,4 +1226,71 @@ public class ExpressionAnalyzer {
     }
     return predicate;
   }
+
+  /**
+   * Bind DataType for TimeSeriesOperand in Expression with according ColumnHeaderName. eg:
+   *
+   * <p>columnHeaders: [[QueryId, TEXT], [DataNodeId, INT32]...]
+   *
+   * <p>dataNodeID > 1 -> DataNodeId > 1, `DataNodeID` will be a MeasurementPath with INT32
+   *
+   * <p>errorInput > 1, no according ColumnHeaderName of `errorInput`, throw exception
+   */
+  public static Expression bindTypeForTimeSeriesOperand(
+      Expression predicate, List<ColumnHeader> columnHeaders) {
+    if (predicate instanceof TernaryExpression) {
+      Expression firstExpression =
+          bindTypeForTimeSeriesOperand(
+              ((TernaryExpression) predicate).getFirstExpression(), columnHeaders);
+      Expression secondExpression =
+          bindTypeForTimeSeriesOperand(
+              ((TernaryExpression) predicate).getSecondExpression(), columnHeaders);
+      Expression thirdExpression =
+          bindTypeForTimeSeriesOperand(
+              ((TernaryExpression) predicate).getThirdExpression(), columnHeaders);
+      return reconstructTernaryExpression(
+          predicate, firstExpression, secondExpression, thirdExpression);
+    } else if (predicate instanceof BinaryExpression) {
+      Expression leftExpression =
+          bindTypeForTimeSeriesOperand(
+              ((BinaryExpression) predicate).getLeftExpression(), columnHeaders);
+      Expression rightExpression =
+          bindTypeForTimeSeriesOperand(
+              ((BinaryExpression) predicate).getRightExpression(), columnHeaders);
+      return reconstructBinaryExpression(
+          predicate.getExpressionType(), leftExpression, rightExpression);
+    } else if (predicate instanceof UnaryExpression) {
+      Expression expression =
+          bindTypeForTimeSeriesOperand(
+              ((UnaryExpression) predicate).getExpression(), columnHeaders);
+      return reconstructUnaryExpression((UnaryExpression) predicate, expression);
+    } else if (predicate instanceof FunctionExpression) {
+      List<Expression> expressions = predicate.getExpressions();
+      List<Expression> childrenExpressions = new ArrayList<>();
+      for (Expression expression : expressions) {
+        childrenExpressions.add(bindTypeForTimeSeriesOperand(expression, columnHeaders));
+      }
+      return reconstructFunctionExpression((FunctionExpression) predicate, childrenExpressions);
+    } else if (predicate instanceof TimeSeriesOperand) {
+      String oldPathString = ((TimeSeriesOperand) predicate).getPath().getFullPath();
+      // There are not too many TimeSeriesOperand and columnHeaders in our case,
+      // so we use `for loop` instead of map to get the matched columnHeader for oldPath here.
+      for (ColumnHeader columnHeader : columnHeaders) {
+        if (oldPathString.equalsIgnoreCase(columnHeader.getColumnName())) {
+          try {
+            return reconstructTimeSeriesOperand(
+                new MeasurementPath(columnHeader.getColumnName(), columnHeader.getColumnType()));
+          } catch (IllegalPathException ignored) {
+          }
+        }
+      }
+      throw new SemanticException(
+          String.format("please ensure input[%s] is correct", oldPathString));
+    } else if (predicate instanceof LeafOperand) {
+      return predicate;
+    } else {
+      throw new IllegalArgumentException(
+          "unsupported expression type: " + predicate.getExpressionType());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 672f741e77..ec086136b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -145,6 +145,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -180,6 +181,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.collect.ImmutableSet;
+
 import java.io.FileNotFoundException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -966,7 +969,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
 
     // parse ORDER BY
     if (ctx.orderByClause() != null) {
-      queryStatement.setOrderByComponent(parseOrderByClause(ctx.orderByClause()));
+      queryStatement.setOrderByComponent(
+          parseOrderByClause(
+              ctx.orderByClause(),
+              ImmutableSet.of(SortKey.TIME, SortKey.DEVICE, SortKey.TIMESERIES)));
     }
 
     // parse FILL
@@ -1194,7 +1200,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
   }
 
   // ---- Order By Clause
-  private OrderByComponent parseOrderByClause(IoTDBSqlParser.OrderByClauseContext ctx) {
+  // all SortKeys should be contained by limitSet
+  private OrderByComponent parseOrderByClause(
+      IoTDBSqlParser.OrderByClauseContext ctx, ImmutableSet<SortKey> limitSet) {
     OrderByComponent orderByComponent = new OrderByComponent();
     Set<SortKey> sortKeySet = new HashSet<>();
     for (IoTDBSqlParser.OrderByAttributeClauseContext orderByAttributeClauseContext :
@@ -1202,6 +1210,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       SortItem sortItem = parseOrderByAttributeClause(orderByAttributeClauseContext);
 
       SortKey sortKey = sortItem.getSortKey();
+      if (!limitSet.contains(sortKey)) {
+        throw new SemanticException(
+            String.format("ORDER BY: sort key[%s] is not contained in '%s'", sortKey, limitSet));
+      }
       if (sortKeySet.contains(sortKey)) {
         throw new SemanticException(String.format("ORDER BY: duplicate sort key '%s'", sortKey));
       } else {
@@ -2537,6 +2549,42 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return setSystemStatusStatement;
   }
 
+  // show query processlist
+
+  @Override
+  public Statement visitShowQueries(IoTDBSqlParser.ShowQueriesContext ctx) {
+    ShowQueriesStatement showQueriesStatement = new ShowQueriesStatement();
+    // parse WHERE
+    if (ctx.whereClause() != null) {
+      showQueriesStatement.setWhereCondition(parseWhereClause(ctx.whereClause()));
+    }
+
+    // parse ORDER BY
+    if (ctx.orderByClause() != null) {
+      showQueriesStatement.setOrderByComponent(
+          parseOrderByClause(
+              ctx.orderByClause(),
+              ImmutableSet.of(
+                  SortKey.TIME,
+                  SortKey.QUERYID,
+                  SortKey.DATANODEID,
+                  SortKey.ELAPSEDTIME,
+                  SortKey.STATEMENT)));
+    }
+
+    // parse LIMIT & OFFSET
+    if (ctx.rowPaginationClause() != null) {
+      if (ctx.rowPaginationClause().limitClause() != null) {
+        showQueriesStatement.setRowLimit(parseLimitClause(ctx.rowPaginationClause().limitClause()));
+      }
+      if (ctx.rowPaginationClause().offsetClause() != null) {
+        showQueriesStatement.setRowOffset(
+            parseOffsetClause(ctx.rowPaginationClause().offsetClause()));
+      }
+    }
+    return showQueriesStatement;
+  }
+
   // show region
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index d9a458b4c4..7dd495ab78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -706,4 +707,12 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
             .planSchemaQueryMerge(false);
     return planBuilder.getRoot();
   }
+
+  @Override
+  public PlanNode visitShowQueries(
+      ShowQueriesStatement showQueriesStatement, MPPQueryContext context) {
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+    // TODO planBuilder = planBuilder.planShowQueries()
+    return planBuilder.getRoot();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 56f39db6d6..cfc7ce8e26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -84,6 +84,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -318,6 +319,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(setSystemStatusStatement, context);
   }
 
+  public R visitShowQueries(ShowQueriesStatement showQueriesStatement, C context) {
+    return visitStatement(showQueriesStatement, context);
+  }
+
   public R visitShowRegion(ShowRegionStatement showRegionStatement, C context) {
     return visitStatement(showRegionStatement, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
index e4a876c85a..b39d16fbba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/SortKey.java
@@ -22,5 +22,9 @@ package org.apache.iotdb.db.mpp.plan.statement.component;
 public enum SortKey {
   TIME,
   TIMESERIES,
-  DEVICE
+  DEVICE,
+  QUERYID,
+  DATANODEID,
+  ELAPSEDTIME,
+  STATEMENT
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
new file mode 100644
index 0000000000..33d492b1b1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys;
+
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement;
+
+public class ShowQueriesStatement extends ShowStatement {
+
+  private WhereCondition whereCondition;
+
+  private OrderByComponent orderByComponent;
+
+  private int rowLimit;
+  private int rowOffset;
+
+  public ShowQueriesStatement() {}
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowQueries(this, context);
+  }
+
+  public void setWhereCondition(WhereCondition whereCondition) {
+    this.whereCondition = whereCondition;
+  }
+
+  public WhereCondition getWhereCondition() {
+    return whereCondition;
+  }
+
+  public void setOrderByComponent(OrderByComponent orderByComponent) {
+    this.orderByComponent = orderByComponent;
+  }
+
+  public OrderByComponent getOrderByComponent() {
+    return orderByComponent;
+  }
+
+  public void setRowLimit(int rowLimit) {
+    this.rowLimit = rowLimit;
+  }
+
+  public int getRowLimit() {
+    return rowLimit;
+  }
+
+  public void setRowOffset(int rowOffset) {
+    this.rowOffset = rowOffset;
+  }
+
+  public int getRowOffset() {
+    return rowOffset;
+  }
+}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 6849625e51..530ad15855 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -411,6 +411,11 @@ struct TGetJarInListResp {
   2: required list<binary> jarList
 }
 
+struct TGetDataNodeLocationsResp {
+  1: required common.TSStatus status
+  2: required list<common.TDataNodeLocation> dataNodeLocationList
+}
+
 // Show cluster
 struct TShowClusterResp {
   1: required common.TSStatus status
@@ -963,6 +968,9 @@ service IConfigNodeRPCService {
   /** Migrate a region replica from one dataNode to another */
   common.TSStatus migrateRegion(TMigrateRegionReq req)
 
+  /** Get all DataNodeLocations of Running DataNodes */
+  TGetDataNodeLocationsResp getRunningDataNodeLocations()
+
   // ======================================================
   // Cluster Tools
   // ======================================================