You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/30 07:00:51 UTC

[iotdb] branch master updated: [IOTDB-4816]Show queries - implement LogicPlan & DistributionPlan & Operator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 82ffaa3c26 [IOTDB-4816]Show queries -  implement LogicPlan & DistributionPlan & Operator
82ffaa3c26 is described below

commit 82ffaa3c268577f34f59b2b3b323910625904753
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Dec 30 15:00:46 2022 +0800

    [IOTDB-4816]Show queries -  implement LogicPlan & DistributionPlan & Operator
---
 .../execution/operator/process/SortOperator.java   |  83 ++++++-
 .../process/join/merge/MergeSortComparator.java    | 110 ++++++++--
 .../operator/source/ShowQueriesOperator.java       | 140 ++++++++++++
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   6 +
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  11 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  22 ++
 .../db/mpp/plan/execution/IQueryExecution.java     |   2 +
 .../db/mpp/plan/execution/QueryExecution.java      |   5 +
 .../mpp/plan/execution/config/ConfigExecution.java |   5 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |   2 +
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  62 ++++++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |   6 +-
 .../plan/planner/MemoryDistributionCalculator.java |   7 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  84 ++++++-
 .../planner/distribution/DistributionPlanner.java  |   4 +-
 .../planner/distribution/ExchangeNodeAdder.java    |  24 ++
 .../SimpleFragmentParallelPlanner.java             |  27 ++-
 .../plan/planner/distribution/SourceRewriter.java  |   2 +-
 .../db/mpp/plan/planner/plan/PlanFragment.java     |  24 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  10 +-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../plan/planner/plan/node/process/SortNode.java   |  54 ++---
 .../SortNode.java => source/ShowQueriesNode.java}  |  82 +++----
 .../plan/node/source/VirtualSourceNode.java        |  41 ++++
 .../planner/plan/parameter/OrderByParameter.java   |  10 +
 .../plan/statement/sys/ShowQueriesStatement.java   |  25 +++
 .../execution/operator/MergeSortOperatorTest.java  | 243 ++++++++++++++++++++-
 .../mpp/execution/operator/OperatorMemoryTest.java |  19 +-
 .../plan/plan/node/process/SortNodeSerdeTest.java  |  10 +-
 30 files changed, 1000 insertions(+), 131 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 5fa693b2a8..16c190a25e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -18,55 +18,120 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
 public class SortOperator implements ProcessOperator {
+  private final OperatorContext operatorContext;
+  private final Operator inputOperator;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private List<MergeSortKey> cachedData;
+  private final Comparator<MergeSortKey> comparator;
+
+  public SortOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> dataTypes,
+      Comparator<MergeSortKey> comparator) {
+    this.operatorContext = operatorContext;
+    this.inputOperator = inputOperator;
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.cachedData = new ArrayList<>();
+    this.comparator = comparator;
+  }
 
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return inputOperator.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock tsBlock = inputOperator.nextWithTimer();
+    if (tsBlock == null) {
+      return null;
+    }
+    // add data of each TsBlock from child into list
+    for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+      cachedData.add(new MergeSortKey(tsBlock, i));
+    }
+    // child has more data, can't calculate
+    if (inputOperator.hasNextWithTimer()) {
+      return null;
+    }
+
+    if (cachedData.size() > 1) {
+      cachedData.sort(comparator);
+    }
+    TsBlock result = buildTsBlock();
+    cachedData = null;
+    return result;
+  }
+
+  private TsBlock buildTsBlock() {
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    cachedData.forEach(
+        mergeSortKey -> {
+          TsBlock tsBlock = mergeSortKey.tsBlock;
+          int row = mergeSortKey.rowIndex;
+          timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(row));
+          for (int i = 0; i < valueColumnBuilders.length; i++) {
+            valueColumnBuilders[i].write(tsBlock.getColumn(i), row);
+          }
+          tsBlockBuilder.declarePosition();
+        });
+    return tsBlockBuilder.build();
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return inputOperator.hasNextWithTimer();
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    inputOperator.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return cachedData == null;
   }
 
   @Override
   public long calculateMaxPeekMemory() {
-    return 0;
+    // In fact, we need to cache all data from input.
+    // Now the child of this Operator only will be ShowQueries, it only returns one Block.
+    return inputOperator.calculateMaxPeekMemory()
+        + inputOperator.calculateRetainedSizeAfterCallingNext();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return 0;
+    return inputOperator.calculateMaxReturnSize();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return 0;
+    return inputOperator.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
index da17deed4e..b25add1a25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -23,7 +23,11 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import org.apache.commons.collections4.comparators.ComparatorChain;
+
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
@@ -123,23 +127,99 @@ public class MergeSortComparator {
             : deviceComparing;
       };
 
-  public static Comparator<MergeSortKey> getComparator(List<SortItem> sortItemList) {
-    if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
-      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
-        else return ASC_DEVICE_ASC_TIME;
-      } else {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
-        else return ASC_DEVICE_DESC_TIME;
-      }
-    } else {
-      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
-        else return DESC_DEVICE_ASC_TIME;
+  public static Comparator<MergeSortKey> getComparator(
+      List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> dataTypeList) {
+    // specified for order by time, device or order by device, time
+    if (sortItemList.size() == 2
+        && ((sortItemList.get(0).getSortKey() == SortKey.TIME
+                && sortItemList.get(1).getSortKey() == SortKey.DEVICE)
+            || (sortItemList.get(0).getSortKey() == SortKey.DEVICE
+                && sortItemList.get(1).getSortKey() == SortKey.TIME))) {
+      if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
+        if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
+          else return ASC_DEVICE_ASC_TIME;
+        } else {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
+          else return ASC_DEVICE_DESC_TIME;
+        }
       } else {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
-        else return DESC_DEVICE_DESC_TIME;
+        if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
+          else return DESC_DEVICE_ASC_TIME;
+        } else {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
+          else return DESC_DEVICE_DESC_TIME;
+        }
       }
+    } else { // generally order by
+      // use code-gen compile this comparator
+
+      // currently only show queries use merge sort and will only contain TIME, QUERYID, DATANODEID,
+      // ELAPSEDTIME, STATEMENT
+      return genComparatorChain(sortItemList, indexList, dataTypeList);
+    }
+  }
+
+  /** @param indexList -1 for time column */
+  private static ComparatorChain<MergeSortKey> genComparatorChain(
+      List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> dataTypeList) {
+    List<Comparator<MergeSortKey>> list = new ArrayList<>(indexList.size());
+    for (int i = 0; i < indexList.size(); i++) {
+      int index = indexList.get(i);
+      TSDataType dataType = dataTypeList.get(i);
+      boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
+      list.add(genSingleComparator(asc, index, dataType));
+    }
+    return new ComparatorChain<>(list);
+  }
+
+  private static Comparator<MergeSortKey> genSingleComparator(
+      boolean asc, int index, TSDataType dataType) {
+    Comparator<MergeSortKey> comparator;
+    switch (dataType) {
+      case INT32:
+        comparator =
+            Comparator.comparingInt(
+                (MergeSortKey sortKey) ->
+                    sortKey.tsBlock.getColumn(index).getInt(sortKey.rowIndex));
+        break;
+      case INT64:
+        if (index == -1) {
+          comparator =
+              Comparator.comparingLong(
+                  (MergeSortKey sortKey) -> sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
+        } else {
+          comparator =
+              Comparator.comparingLong(
+                  (MergeSortKey sortKey) ->
+                      sortKey.tsBlock.getColumn(index).getLong(sortKey.rowIndex));
+        }
+        break;
+      case FLOAT:
+        comparator =
+            Comparator.comparingDouble(
+                (MergeSortKey sortKey) ->
+                    sortKey.tsBlock.getColumn(index).getFloat(sortKey.rowIndex));
+        break;
+      case DOUBLE:
+        comparator =
+            Comparator.comparingDouble(
+                (MergeSortKey sortKey) ->
+                    sortKey.tsBlock.getColumn(index).getDouble(sortKey.rowIndex));
+        break;
+      case TEXT:
+        comparator =
+            Comparator.comparing(
+                (MergeSortKey sortKey) ->
+                    sortKey.tsBlock.getColumn(index).getBinary(sortKey.rowIndex));
+        break;
+      default:
+        throw new IllegalArgumentException("Data type: " + dataType + " cannot be ordered");
+    }
+    if (!asc) {
+      comparator = comparator.reversed();
     }
+    return comparator;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
new file mode 100644
index 0000000000..0a9c86d0c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class ShowQueriesOperator implements SourceOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final PlanNodeId sourceId;
+
+  private TsBlock tsBlock;
+  private boolean hasConsumed;
+
+  private final Coordinator coordinator;
+
+  public ShowQueriesOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, Coordinator coordinator) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.coordinator = coordinator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = tsBlock;
+    hasConsumed = true;
+    tsBlock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (hasConsumed) {
+      return false;
+    }
+    if (tsBlock == null) {
+      tsBlock = buildTsBlock();
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return hasConsumed;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  private TsBlock buildTsBlock() {
+    List<TSDataType> outputDataTypes =
+        DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes();
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    List<IQueryExecution> queryExecutions = coordinator.getAllQueryExecutions();
+
+    if (!queryExecutions.isEmpty()) {
+      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+      ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+      long currTime = System.currentTimeMillis();
+      String[] splits = queryExecutions.get(0).getQueryId().split("_");
+      int DataNodeId = Integer.parseInt(splits[splits.length - 1]);
+
+      for (IQueryExecution queryExecution : queryExecutions) {
+        timeColumnBuilder.writeLong(queryExecution.getStartExecutionTime());
+        columnBuilders[0].writeBinary(Binary.valueOf(queryExecution.getQueryId()));
+        columnBuilders[1].writeInt(DataNodeId);
+        columnBuilders[2].writeFloat(
+            (float) (currTime - queryExecution.getStartExecutionTime()) / 1000);
+        columnBuilders[3].writeBinary(
+            Binary.valueOf(queryExecution.getExecuteSQL().orElse("UNKNOWN")));
+        builder.declarePosition();
+      }
+    }
+    return builder.build();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index e2bb953f26..3fb52f929b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.db.utils.SetThreadName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -169,6 +171,10 @@ public class Coordinator {
     return queryExecutionMap.get(queryId);
   }
 
+  public List<IQueryExecution> getAllQueryExecutions() {
+    return new ArrayList<>(queryExecutionMap.values());
+  }
+
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
   private ExecutorService getQueryExecutor() {
     int coordinatorReadExecutorSize =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 58c362f38a..92f9018ca5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -163,6 +163,9 @@ public class Analysis {
   // header of result dataset
   private DatasetHeader respDatasetHeader;
 
+  // indicate whether the Nodes produce source data are VirtualSourceNodes
+  private boolean isVirtualSource = false;
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // SELECT INTO Analysis
   /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -522,4 +525,12 @@ public class Analysis {
   public void setRunningDataNodeLocations(List<TDataNodeLocation> runningDataNodeLocations) {
     this.runningDataNodeLocations = runningDataNodeLocations;
   }
+
+  public boolean isVirtualSource() {
+    return isVirtualSource;
+  }
+
+  public void setVirtualSource(boolean virtualSource) {
+    isVirtualSource = virtualSource;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index d4514bb892..accb761aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -2516,16 +2516,38 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     Analysis analysis = new Analysis();
     analysis.setStatement(showQueriesStatement);
     analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowQueriesHeader());
+    analysis.setVirtualSource(true);
 
     List<TDataNodeLocation> allRunningDataNodeLocations = getRunningDataNodeLocations();
     if (allRunningDataNodeLocations.isEmpty()) {
       analysis.setFinishQueryAfterAnalyze(true);
     }
     // TODO Constant folding optimization for Where Predicate after True/False Constant introduced
+    if (allRunningDataNodeLocations.isEmpty()) {
+      throw new StatementAnalyzeException("no Running DataNodes");
+    }
     analysis.setRunningDataNodeLocations(allRunningDataNodeLocations);
 
+    Set<Expression> sourceExpressions =
+        analysis.getRespDatasetHeader().getColumnHeaders().stream()
+            .map(
+                columnHeader -> {
+                  try {
+                    return new TimeSeriesOperand(
+                        new MeasurementPath(
+                            columnHeader.getColumnName(), columnHeader.getColumnType()));
+                  } catch (IllegalPathException ignored) {
+                  }
+                  return null;
+                })
+            .collect(Collectors.toSet());
+    analysis.setSourceExpressions(sourceExpressions);
+    sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
+
     analyzeWhere(analysis, showQueriesStatement);
 
+    analysis.setMergeOrderParameter(new OrderByParameter(showQueriesStatement.getSortItemList()));
+
     return analysis;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 86e10f6a47..1d9f464667 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -51,6 +51,8 @@ public interface IQueryExecution {
 
   String getQueryId();
 
+  long getStartExecutionTime();
+
   void recordExecutionTime(long executionTime);
 
   long getTotalExecutionTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index bda1af235e..7ed5f82aae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -629,6 +629,11 @@ public class QueryExecution implements IQueryExecution {
     return context.getQueryId().getId();
   }
 
+  @Override
+  public long getStartExecutionTime() {
+    return context.getStartTime();
+  }
+
   @Override
   public void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 29dc1b8aeb..8762cd748e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -219,6 +219,11 @@ public class ConfigExecution implements IQueryExecution {
     return context.getQueryId().getId();
   }
 
+  @Override
+  public long getStartExecutionTime() {
+    return context.getStartTime();
+  }
+
   @Override
   public void recordExecutionTime(long executionTime) {
     totalExecutionTime += executionTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 385473deb6..9d868a23a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -2529,6 +2529,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
             parseOffsetClause(ctx.rowPaginationClause().offsetClause()));
       }
     }
+
+    showQueriesStatement.setZoneId(zoneId);
     return showQueriesStatement;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 744d3aceef..ae52725c12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.AlignedPath;
@@ -64,6 +65,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -74,6 +76,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationType;
@@ -86,6 +89,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -1111,4 +1115,62 @@ public class LogicalPlanBuilder {
             context.getQueryId().genPlanNodeId(), pathPatternList, templateId);
     return this;
   }
+
+  private LogicalPlanBuilder planSort(OrderByParameter orderByParameter) {
+    if (orderByParameter.isEmpty()) {
+      return this;
+    }
+    this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, orderByParameter);
+    return this;
+  }
+
+  public LogicalPlanBuilder planShowQueries(Analysis analysis, ShowQueriesStatement statement) {
+    List<TDataNodeLocation> dataNodeLocations = analysis.getRunningDataNodeLocations();
+    if (dataNodeLocations.size() == 1) {
+      this.root =
+          planSingleShowQueries(dataNodeLocations.get(0))
+              .planFilterAndTransform(
+                  analysis.getWhereExpression(),
+                  analysis.getSourceExpressions(),
+                  false,
+                  statement.getZoneId(),
+                  Ordering.ASC)
+              .planSort(analysis.getMergeOrderParameter())
+              .getRoot();
+    } else {
+      List<String> outputColumns = new ArrayList<>();
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.getQueryId().genPlanNodeId(),
+              analysis.getMergeOrderParameter(),
+              outputColumns);
+
+      dataNodeLocations.forEach(
+          dataNodeLocation ->
+              mergeSortNode.addChild(
+                  this.planSingleShowQueries(dataNodeLocation)
+                      .planFilterAndTransform(
+                          analysis.getWhereExpression(),
+                          analysis.getSourceExpressions(),
+                          false,
+                          statement.getZoneId(),
+                          Ordering.ASC)
+                      .planSort(analysis.getMergeOrderParameter())
+                      .getRoot()));
+      outputColumns.addAll(mergeSortNode.getChildren().get(0).getOutputColumnNames());
+      this.root = mergeSortNode;
+    }
+
+    ColumnHeaderConstant.showQueriesColumnHeaders.forEach(
+        columnHeader ->
+            context
+                .getTypeProvider()
+                .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+    return this;
+  }
+
+  private LogicalPlanBuilder planSingleShowQueries(TDataNodeLocation dataNodeLocation) {
+    this.root = new ShowQueriesNode(context.getQueryId().genPlanNodeId(), dataNodeLocation);
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 7dd495ab78..1b78534118 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -712,7 +712,11 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitShowQueries(
       ShowQueriesStatement showQueriesStatement, MPPQueryContext context) {
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
-    // TODO planBuilder = planBuilder.planShowQueries()
+    planBuilder =
+        planBuilder
+            .planShowQueries(analysis, showQueriesStatement) // push Filter down
+            .planOffset(showQueriesStatement.getRowOffset())
+            .planLimit(showQueriesStatement.getRowLimit());
     return planBuilder.getRoot();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index bd187f74f4..e257554300 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -69,6 +69,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 
 import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
 
@@ -424,6 +425,12 @@ public class MemoryDistributionCalculator
     return null;
   }
 
+  @Override
+  public Void visitShowQueries(ShowQueriesNode node, MemoryDistributionContext context) {
+    // do nothing since VirtualSourceNode will not have Exchange/FragmentSink as child
+    return null;
+  }
+
   enum MemoryDistributionType {
     /**
      * This type means that this node needs data from all the children. For example, TimeJoinNode.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 75f07c6fc2..b0be9ca0d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
@@ -115,6 +116,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
@@ -168,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
@@ -752,8 +756,43 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
     List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
+    genSortInformation(
+        node.getOutputColumnNames(),
+        dataTypes,
+        sortItemList,
+        sortItemIndexList,
+        sortItemDataTypeList);
     return new MergeSortOperator(
-        operatorContext, children, dataTypes, MergeSortComparator.getComparator(sortItemList));
+        operatorContext,
+        children,
+        dataTypes,
+        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
+  }
+
+  private void genSortInformation(
+      List<String> outputColumnNames,
+      List<TSDataType> dataTypes,
+      List<SortItem> sortItemList,
+      List<Integer> sortItemIndexList,
+      List<TSDataType> sortItemDataTypeList) {
+    sortItemList.forEach(
+        sortItem -> {
+          if (sortItem.getSortKey() == SortKey.TIME) {
+            sortItemIndexList.add(-1);
+            sortItemDataTypeList.add(TSDataType.INT64);
+          } else {
+            for (int i = 0; i < outputColumnNames.size(); i++) {
+              if (sortItem.getSortKey().toString().equalsIgnoreCase(outputColumnNames.get(i))) {
+                sortItemIndexList.add(i);
+                sortItemDataTypeList.add(dataTypes.get(i));
+                break;
+              }
+            }
+          }
+        });
   }
 
   @Override
@@ -1405,7 +1444,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
   @Override
   public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
-    return super.visitSort(node, context);
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                DeviceViewIntoOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+    List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList();
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+    List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
+    genSortInformation(
+        node.getOutputColumnNames(),
+        dataTypes,
+        sortItemList,
+        sortItemIndexList,
+        sortItemDataTypeList);
+    return new SortOperator(
+        operatorContext,
+        child,
+        dataTypes,
+        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
   }
 
   @Override
@@ -1621,6 +1685,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new VerticallyConcatOperator(operatorContext, children, outputColumnTypes);
   }
 
+  @Override
+  public Operator visitShowQueries(ShowQueriesNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                ShowQueriesOperator.class.getSimpleName());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+    return new ShowQueriesOperator(
+        operatorContext, node.getPlanNodeId(), Coordinator.getInstance());
+  }
+
   private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) {
     // TODO we should also sort the InputLocation for each column if they are not overlapped
     return makeLayout(node).values().stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 965dcb1b56..0c564a3fa1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 
 import java.util.List;
 
@@ -73,7 +74,8 @@ public class DistributionPlanner {
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    if (analysis.getStatement() instanceof QueryStatement) {
+    if (analysis.getStatement() instanceof QueryStatement
+        || analysis.getStatement() instanceof ShowQueriesStatement) {
       analysis
           .getRespDatasetHeader()
           .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index cc24dd0c97..2d48a9d0fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -217,6 +217,9 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
 
   @Override
   public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) {
+    if (analysis.isVirtualSource()) {
+      return processMultiChildNodeByLocation(node, context);
+    }
     // 1. Group children by dataRegion
     Map<TRegionReplicaSet, List<PlanNode>> childrenGroupMap = new HashMap<>();
     for (int i = 0; i < node.getChildren().size(); i++) {
@@ -412,6 +415,10 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
   }
 
   private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) {
+    if (analysis.isVirtualSource()) {
+      return processMultiChildNodeByLocation(node, context);
+    }
+
     MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
     List<PlanNode> visitedChildren = new ArrayList<>();
     node.getChildren()
@@ -455,6 +462,23 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  private PlanNode processMultiChildNodeByLocation(
+      MultiChildProcessNode node, NodeGroupContext context) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
+
+    List<PlanNode> children = node.getChildren();
+    newNode.addChild(children.get(0));
+    for (int i = 1; i < children.size(); i++) {
+      PlanNode child = children.get(i);
+      ExchangeNode exchangeNode =
+          new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+      exchangeNode.setChild(child);
+      exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+      newNode.addChild(exchangeNode);
+    }
+    return newNode;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 96719c60cc..2ee598e597 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.slf4j.Logger;
@@ -114,21 +115,27 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // We need to store all the replica host in case of the scenario that the instance need to be
     // redirected
     // to another host when scheduling
-    if ((analysis.getDataPartitionInfo() == null || analysis.getDataPartitionInfo().isEmpty())
-        && (analysis.getStatement() instanceof QueryStatement
-            && ((QueryStatement) analysis.getStatement()).isAggregationQuery())) {
-      // AggregationQuery && no data region, we need to execute this FI on local
-      fragmentInstance.setExecutorAndHost(
-          new QueryExecutor(
-              new TDataNodeLocation()
-                  .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT)
-                  .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT)));
+    if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) {
+      TDataNodeLocation dataNodeLocation = fragment.getTargetLocation();
+      if (dataNodeLocation != null) {
+        // now only the case ShowQueries will enter here
+        fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation));
+      } else {
+        // no data region && no dataNodeLocation, we need to execute this FI on local
+        // now only the case AggregationQuery has schema but no data region will enter here
+        fragmentInstance.setExecutorAndHost(
+            new QueryExecutor(
+                new TDataNodeLocation()
+                    .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT)
+                    .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT)));
+      }
     } else {
       fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
       fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
     }
 
-    if (analysis.getStatement() instanceof QueryStatement) {
+    if (analysis.getStatement() instanceof QueryStatement
+        || analysis.getStatement() instanceof ShowQueriesStatement) {
       fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 20d703a945..80e446cf18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -79,7 +79,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC
 
 public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
 
-  private Analysis analysis;
+  private final Analysis analysis;
 
   public SourceRewriter(Analysis analysis) {
     this.analysis = analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index c21cfa28fc..aac15e7940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner.plan;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -89,7 +91,7 @@ public class PlanFragment {
     return String.format("PlanFragment-%s", getId());
   }
 
-  // Every Fragment should only run in DataRegion.
+  // Every Fragment related with DataPartition should only run in one DataRegion.
   // But it can select any one of the Endpoint of the target DataRegion
   // In current version, one PlanFragment should contain at least one SourceNode,
   // and the DataRegions of all SourceNodes should be same in one PlanFragment.
@@ -98,6 +100,13 @@ public class PlanFragment {
     return getNodeRegion(planNodeTree);
   }
 
+  // If a Fragment is not related with DataPartition,
+  // it may be related with a specific DataNode.
+  // This method return the DataNodeLocation will offer execution of this Fragment.
+  public TDataNodeLocation getTargetLocation() {
+    return getNodeLocation(planNodeTree);
+  }
+
   private TRegionReplicaSet getNodeRegion(PlanNode root) {
     if (root instanceof IPartitionRelatedNode) {
       return ((IPartitionRelatedNode) root).getRegionReplicaSet();
@@ -111,6 +120,19 @@ public class PlanFragment {
     return null;
   }
 
+  private TDataNodeLocation getNodeLocation(PlanNode root) {
+    if (root instanceof VirtualSourceNode) {
+      return ((VirtualSourceNode) root).getDataNodeLocation();
+    }
+    for (PlanNode child : root.getChildren()) {
+      TDataNodeLocation result = getNodeLocation(child);
+      if (result != null) {
+        return result;
+      }
+    }
+    return null;
+  }
+
   public PlanNode getPlanNodeById(PlanNodeId nodeId) {
     return getPlanNodeById(planNodeTree, nodeId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 58bbe7b371..dfdaa9e77b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -56,7 +56,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.commons.lang3.Validate;
@@ -169,12 +168,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("MergeSort-%s", node.getPlanNodeId().getId()));
     boxValue.add(String.format("ChildrenCount: %d", node.getChildren().size()));
-    StringBuilder sortInfo = new StringBuilder("Order:");
-    for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
-      sortInfo.append(" ");
-      sortInfo.append(sortItem.getSortKey()).append(" ").append(sortItem.getOrdering());
-    }
-    boxValue.add(sortInfo.toString());
+    boxValue.add(node.getMergeOrderParameter().toString());
     return render(node, boxValue, context);
   }
 
@@ -284,7 +278,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
   public List<String> visitSort(SortNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("Sort-%s", node.getPlanNodeId().getId()));
-    boxValue.add(String.format("OrderBy: %s", node.getSortOrder()));
+    boxValue.add(node.getOrderByParameter().toString());
     return render(node, boxValue, context);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 70999dc816..a289b1145b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -156,7 +157,8 @@ public enum PlanNodeType {
   DEVICE_VIEW_INTO((short) 63),
   VERTICALLY_CONCAT((short) 64),
   SINGLE_DEVICE_VIEW((short) 65),
-  MERGE_SORT((short) 66);
+  MERGE_SORT((short) 66),
+  SHOW_QUERIES((short) 67);
 
   public static final int BYTES = Short.BYTES;
 
@@ -339,6 +341,8 @@ public enum PlanNodeType {
         return SingleDeviceViewNode.deserialize(buffer);
       case 66:
         return MergeSortNode.deserialize(buffer);
+      case 67:
+        return ShowQueriesNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index f7eb7d5704..96422f5074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -341,4 +342,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitShowQueries(ShowQueriesNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
index e033fd163d..f61b34e842 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
@@ -22,40 +22,36 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+
+import com.google.common.base.Objects;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Objects;
 
-/**
- * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortNode should not appear.
- */
 public class SortNode extends SingleChildProcessNode {
 
-  private final Ordering sortOrder;
+  private final OrderByParameter orderByParameter;
 
-  public SortNode(PlanNodeId id, Ordering sortOrder) {
-    super(id);
-    this.sortOrder = sortOrder;
+  public SortNode(PlanNodeId id, PlanNode child, OrderByParameter orderByParameter) {
+    super(id, child);
+    this.orderByParameter = orderByParameter;
   }
 
-  public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
-    super(id, child);
-    this.sortOrder = sortOrder;
+  public SortNode(PlanNodeId id, OrderByParameter orderByParameter) {
+    super(id);
+    this.orderByParameter = orderByParameter;
   }
 
-  public Ordering getSortOrder() {
-    return sortOrder;
+  public OrderByParameter getOrderByParameter() {
+    return orderByParameter;
   }
 
   @Override
   public PlanNode clone() {
-    return new SortNode(getPlanNodeId(), sortOrder);
+    return new SortNode(getPlanNodeId(), child, orderByParameter);
   }
 
   @Override
@@ -71,38 +67,32 @@ public class SortNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.SORT.serialize(byteBuffer);
-    ReadWriteIOUtils.write(sortOrder.ordinal(), byteBuffer);
+    orderByParameter.serializeAttributes(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
     PlanNodeType.SORT.serialize(stream);
-    ReadWriteIOUtils.write(sortOrder.ordinal(), stream);
+    orderByParameter.serializeAttributes(stream);
   }
 
   public static SortNode deserialize(ByteBuffer byteBuffer) {
-    Ordering orderBy = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    OrderByParameter orderByParameter = OrderByParameter.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new SortNode(planNodeId, orderBy);
+    return new SortNode(planNodeId, orderByParameter);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
     SortNode sortNode = (SortNode) o;
-    return sortOrder == sortNode.sortOrder;
+    return Objects.equal(orderByParameter, sortNode.orderByParameter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), sortOrder);
+    return Objects.hashCode(super.hashCode(), orderByParameter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
index e033fd163d..8ecdd89608 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,14 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,78 +33,78 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 
-/**
- * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortNode should not appear.
- */
-public class SortNode extends SingleChildProcessNode {
+public class ShowQueriesNode extends VirtualSourceNode {
 
-  private final Ordering sortOrder;
+  public static final List<String> SHOW_QUERIES_HEADER_COLUMNS =
+      ImmutableList.of(
+          ColumnHeaderConstant.QUERY_ID,
+          ColumnHeaderConstant.DATA_NODE_ID,
+          ColumnHeaderConstant.ELAPSED_TIME,
+          ColumnHeaderConstant.STATEMENT);
 
-  public SortNode(PlanNodeId id, Ordering sortOrder) {
-    super(id);
-    this.sortOrder = sortOrder;
+  public ShowQueriesNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) {
+    super(id, dataNodeLocation);
   }
 
-  public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
-    super(id, child);
-    this.sortOrder = sortOrder;
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
   }
 
-  public Ordering getSortOrder() {
-    return sortOrder;
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("no child is allowed for ShowQueriesNode");
   }
 
   @Override
   public PlanNode clone() {
-    return new SortNode(getPlanNodeId(), sortOrder);
+    return new ShowQueriesNode(getPlanNodeId(), getDataNodeLocation());
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return SHOW_QUERIES_HEADER_COLUMNS;
   }
 
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitSort(this, context);
+    return visitor.visitShowQueries(this, context);
   }
 
+  // We only use DataNodeLocation when do distributionPlan, so DataNodeLocation is no need to
+  // serialize
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.SORT.serialize(byteBuffer);
-    ReadWriteIOUtils.write(sortOrder.ordinal(), byteBuffer);
+    PlanNodeType.SHOW_QUERIES.serialize(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    PlanNodeType.SORT.serialize(stream);
-    ReadWriteIOUtils.write(sortOrder.ordinal(), stream);
+    PlanNodeType.SHOW_QUERIES.serialize(stream);
   }
 
-  public static SortNode deserialize(ByteBuffer byteBuffer) {
-    Ordering orderBy = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+  public static ShowQueriesNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new SortNode(planNodeId, orderBy);
+    return new ShowQueriesNode(planNodeId, null);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    SortNode sortNode = (SortNode) o;
-    return sortOrder == sortNode.sortOrder;
+    return super.equals(o);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), sortOrder);
+    return Objects.hash(super.hashCode());
+  }
+
+  @Override
+  public String toString() {
+    return "ShowQueriesNode-" + this.getPlanNodeId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java
new file mode 100644
index 0000000000..b012826385
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+public abstract class VirtualSourceNode extends PlanNode {
+
+  private TDataNodeLocation dataNodeLocation;
+
+  public VirtualSourceNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) {
+    super(id);
+    this.dataNodeLocation = dataNodeLocation;
+  }
+
+  public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
+    this.dataNodeLocation = dataNodeLocation;
+  }
+
+  public TDataNodeLocation getDataNodeLocation() {
+    return dataNodeLocation;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
index 6310eb7942..2adf7b1bc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
@@ -50,6 +50,16 @@ public class OrderByParameter {
     return sortItemList.isEmpty();
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sortInfo = new StringBuilder("OrderBy:");
+    for (SortItem sortItem : sortItemList) {
+      sortInfo.append(" ");
+      sortInfo.append(sortItem.toSQLString());
+    }
+    return sortInfo.toString();
+  }
+
   public void serializeAttributes(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(sortItemList.size(), byteBuffer);
     for (SortItem sortItem : sortItemList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
index 33d492b1b1..1b3439519d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -21,9 +21,16 @@ package org.apache.iotdb.db.mpp.plan.statement.sys;
 
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement;
 
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+
 public class ShowQueriesStatement extends ShowStatement {
 
   private WhereCondition whereCondition;
@@ -33,6 +40,8 @@ public class ShowQueriesStatement extends ShowStatement {
   private int rowLimit;
   private int rowOffset;
 
+  private ZoneId zoneId;
+
   public ShowQueriesStatement() {}
 
   @Override
@@ -56,6 +65,14 @@ public class ShowQueriesStatement extends ShowStatement {
     return orderByComponent;
   }
 
+  public List<SortItem> getSortItemList() {
+    if (orderByComponent == null) {
+      // default order
+      return Collections.singletonList(new SortItem(SortKey.TIME, Ordering.ASC));
+    }
+    return orderByComponent.getSortItemList();
+  }
+
   public void setRowLimit(int rowLimit) {
     this.rowLimit = rowLimit;
   }
@@ -71,4 +88,12 @@ public class ShowQueriesStatement extends ShowStatement {
   public int getRowOffset() {
     return rowOffset;
   }
+
+  public ZoneId getZoneId() {
+    return zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index cd5831502e..9a59a462e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -20,30 +20,41 @@ package org.apache.iotdb.db.mpp.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -53,14 +64,19 @@ import io.airlift.units.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -83,8 +99,12 @@ public class MergeSortOperatorTest {
   private static final String DEVICE2 = MERGE_SORT_OPERATOR_TEST_SG + ".device2";
   private static final String DEVICE3 = MERGE_SORT_OPERATOR_TEST_SG + ".device3";
 
+  private int dataNodeId;
+
   @Before
   public void setUp() throws MetadataException, IOException, WriteProcessException {
+    dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
     SeriesReaderTestUtil.setUp(
         measurementSchemas, deviceIds, seqResources, unSeqResources, MERGE_SORT_OPERATOR_TEST_SG);
   }
@@ -92,6 +112,7 @@ public class MergeSortOperatorTest {
   @After
   public void tearDown() throws IOException {
     SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
   }
 
   long getValue(long expectedTime) {
@@ -317,7 +338,9 @@ public class MergeSortOperatorTest {
           MergeSortComparator.getComparator(
               Arrays.asList(
                   new SortItem(SortKey.TIME, timeOrdering),
-                  new SortItem(SortKey.DEVICE, deviceOrdering))));
+                  new SortItem(SortKey.DEVICE, deviceOrdering)),
+              null,
+              null));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -791,7 +814,9 @@ public class MergeSortOperatorTest {
               MergeSortComparator.getComparator(
                   Arrays.asList(
                       new SortItem(SortKey.TIME, timeOrdering),
-                      new SortItem(SortKey.DEVICE, deviceOrdering))));
+                      new SortItem(SortKey.DEVICE, deviceOrdering)),
+                  null,
+                  null));
       MergeSortOperator mergeSortOperator2 =
           new MergeSortOperator(
               fragmentInstanceContext.getOperatorContexts().get(15),
@@ -800,7 +825,9 @@ public class MergeSortOperatorTest {
               MergeSortComparator.getComparator(
                   Arrays.asList(
                       new SortItem(SortKey.TIME, timeOrdering),
-                      new SortItem(SortKey.DEVICE, deviceOrdering))));
+                      new SortItem(SortKey.DEVICE, deviceOrdering)),
+                  null,
+                  null));
 
       return new MergeSortOperator(
           fragmentInstanceContext.getOperatorContexts().get(16),
@@ -809,7 +836,9 @@ public class MergeSortOperatorTest {
           MergeSortComparator.getComparator(
               Arrays.asList(
                   new SortItem(SortKey.TIME, timeOrdering),
-                  new SortItem(SortKey.DEVICE, deviceOrdering))));
+                  new SortItem(SortKey.DEVICE, deviceOrdering)),
+              null,
+              null));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -1257,7 +1286,9 @@ public class MergeSortOperatorTest {
           MergeSortComparator.getComparator(
               Arrays.asList(
                   new SortItem(SortKey.DEVICE, deviceOrdering),
-                  new SortItem(SortKey.TIME, timeOrdering))));
+                  new SortItem(SortKey.TIME, timeOrdering)),
+              null,
+              null));
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -1472,4 +1503,206 @@ public class MergeSortOperatorTest {
     }
     assertEquals(count, 2000);
   }
+
+  // ------------------------------------------------------------------------------------------------
+  //                                   order by Time, DataNodeID
+  // ------------------------------------------------------------------------------------------------
+  //
+  //                                     MergeSortOperator
+  //                                  ___________|__________
+  //                                 /                      \
+  //                           SortOperator             SortOperator
+  //                                |                        |
+  //                        ShowQueriesOperator      ShowQueriesOperator
+  // ------------------------------------------------------------------------------------------------
+  @Test
+  public void mergeSortWithSortOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId0 = new PlanNodeId("0");
+      fragmentInstanceContext.addOperatorContext(
+          0, planNodeId0, ShowQueriesOperator.class.getSimpleName());
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, ShowQueriesOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SortOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SortOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, MergeSortOperator.class.getSimpleName());
+
+      List<OperatorContext> operatorContexts = fragmentInstanceContext.getOperatorContexts();
+      List<TSDataType> dataTypes = DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes();
+      Comparator<MergeSortKey> comparator =
+          MergeSortComparator.getComparator(
+              Arrays.asList(
+                  new SortItem(SortKey.TIME, Ordering.ASC),
+                  new SortItem(SortKey.DATANODEID, Ordering.DESC)),
+              ImmutableList.of(-1, 1),
+              ImmutableList.of(TSDataType.INT64, TSDataType.INT32));
+
+      Coordinator coordinator1 = Mockito.mock(Coordinator.class);
+      Mockito.when(coordinator1.getAllQueryExecutions())
+          .thenReturn(
+              ImmutableList.of(
+                  new FakeQueryExecution(3, "20221229_000000_00003_1", "sql3_node1"),
+                  new FakeQueryExecution(1, "20221229_000000_00001_1", "sql1_node1"),
+                  new FakeQueryExecution(2, "20221229_000000_00002_1", "sql2_node1")));
+      Coordinator coordinator2 = Mockito.mock(Coordinator.class);
+      Mockito.when(coordinator2.getAllQueryExecutions())
+          .thenReturn(
+              ImmutableList.of(
+                  new FakeQueryExecution(3, "20221229_000000_00003_2", "sql3_node2"),
+                  new FakeQueryExecution(2, "20221229_000000_00002_2", "sql2_node2"),
+                  new FakeQueryExecution(1, "20221229_000000_00001_2", "sql1_node2")));
+
+      ShowQueriesOperator showQueriesOperator1 =
+          new ShowQueriesOperator(operatorContexts.get(0), planNodeId0, coordinator1);
+      ShowQueriesOperator showQueriesOperator2 =
+          new ShowQueriesOperator(operatorContexts.get(1), planNodeId1, coordinator2);
+      SortOperator sortOperator1 =
+          new SortOperator(operatorContexts.get(2), showQueriesOperator1, dataTypes, comparator);
+      SortOperator sortOperator2 =
+          new SortOperator(operatorContexts.get(3), showQueriesOperator2, dataTypes, comparator);
+      Operator root =
+          new MergeSortOperator(
+              operatorContexts.get(4),
+              ImmutableList.of(sortOperator1, sortOperator2),
+              dataTypes,
+              comparator);
+
+      int index = 0;
+      // Time ASC
+      long[] expectedTime = new long[] {1, 1, 2, 2, 3, 3};
+      // DataNodeId DESC if Times are equal
+      String[] expectedQueryId =
+          new String[] {
+            "20221229_000000_00001_2",
+            "20221229_000000_00001_1",
+            "20221229_000000_00002_2",
+            "20221229_000000_00002_1",
+            "20221229_000000_00003_2",
+            "20221229_000000_00003_1"
+          };
+      int[] expectedDataNodeId = new int[] {2, 1, 2, 1, 2, 1};
+      String[] expectedStatement =
+          new String[] {
+            "sql1_node2", "sql1_node1", "sql2_node2", "sql2_node1", "sql3_node2", "sql3_node1"
+          };
+      while (root.hasNext()) {
+        TsBlock result = root.next();
+        if (result == null) {
+          continue;
+        }
+
+        for (int i = 0; i < result.getPositionCount(); i++, index++) {
+          assertEquals(expectedTime[index], result.getTimeColumn().getLong(i));
+          assertEquals(expectedQueryId[index], result.getColumn(0).getBinary(i).toString());
+          assertEquals(expectedDataNodeId[index], result.getColumn(1).getInt(i));
+          assertEquals(expectedStatement[index], result.getColumn(3).getBinary(i).toString());
+        }
+      }
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  static class FakeQueryExecution implements IQueryExecution {
+    private final long startTime;
+    private final String queryId;
+    private final String sql;
+
+    FakeQueryExecution(long startTime, String queryId, String sql) {
+      this.startTime = startTime;
+      this.queryId = queryId;
+      this.sql = sql;
+    }
+
+    @Override
+    public String getQueryId() {
+      return queryId;
+    }
+
+    @Override
+    public long getStartExecutionTime() {
+      return startTime;
+    }
+
+    @Override
+    public void recordExecutionTime(long executionTime) {}
+
+    @Override
+    public long getTotalExecutionTime() {
+      return 0;
+    }
+
+    @Override
+    public Optional<String> getExecuteSQL() {
+      return Optional.of(sql);
+    }
+
+    @Override
+    public Statement getStatement() {
+      return null;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void stop() {}
+
+    @Override
+    public void stopAndCleanup() {}
+
+    @Override
+    public ExecutionResult getStatus() {
+      return null;
+    }
+
+    @Override
+    public Optional<TsBlock> getBatchResult() throws IoTDBException {
+      return Optional.empty();
+    }
+
+    @Override
+    public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+      return Optional.empty();
+    }
+
+    @Override
+    public boolean hasNextResult() {
+      return false;
+    }
+
+    @Override
+    public int getOutputValueColumnCount() {
+      return 0;
+    }
+
+    @Override
+    public DatasetHeader getDatasetHeader() {
+      return null;
+    }
+
+    @Override
+    public boolean isQuery() {
+      return false;
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 69d6b2382c..8d57229b44 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -445,10 +445,21 @@ public class OperatorMemoryTest {
 
   @Test
   public void sortOperatorTest() {
-    SortOperator sortOperator = new SortOperator();
-    assertEquals(0, sortOperator.calculateMaxPeekMemory());
-    assertEquals(0, sortOperator.calculateMaxReturnSize());
-    assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
+    Operator child = Mockito.mock(Operator.class);
+    Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+    Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+    Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+    SortOperator sortOperator =
+        new SortOperator(
+            Mockito.mock(OperatorContext.class),
+            child,
+            Collections.singletonList(TSDataType.INT32),
+            null);
+
+    assertEquals(2048 + 512, sortOperator.calculateMaxPeekMemory());
+    assertEquals(1024, sortOperator.calculateMaxReturnSize());
+    assertEquals(512, sortOperator.calculateRetainedSizeAfterCallingNext());
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
index 91cee5952e..eb108b0efb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
@@ -24,11 +24,15 @@ import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
 
@@ -48,7 +52,11 @@ public class SortNodeSerdeTest {
             100,
             100,
             null);
-    SortNode sortNode = new SortNode(new PlanNodeId("TestSortNode"), seriesScanNode, Ordering.ASC);
+    SortNode sortNode =
+        new SortNode(
+            new PlanNodeId("TestSortNode"),
+            seriesScanNode,
+            new OrderByParameter(ImmutableList.of(new SortItem(SortKey.TIME, Ordering.ASC))));
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
     sortNode.serialize(byteBuffer);