You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/30 07:00:51 UTC
[iotdb] branch master updated: [IOTDB-4816]Show queries - implement LogicPlan & DistributionPlan & Operator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 82ffaa3c26 [IOTDB-4816]Show queries - implement LogicPlan & DistributionPlan & Operator
82ffaa3c26 is described below
commit 82ffaa3c268577f34f59b2b3b323910625904753
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Dec 30 15:00:46 2022 +0800
[IOTDB-4816]Show queries - implement LogicPlan & DistributionPlan & Operator
---
.../execution/operator/process/SortOperator.java | 83 ++++++-
.../process/join/merge/MergeSortComparator.java | 110 ++++++++--
.../operator/source/ShowQueriesOperator.java | 140 ++++++++++++
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 6 +
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 11 +
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 22 ++
.../db/mpp/plan/execution/IQueryExecution.java | 2 +
.../db/mpp/plan/execution/QueryExecution.java | 5 +
.../mpp/plan/execution/config/ConfigExecution.java | 5 +
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 2 +
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 62 ++++++
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 6 +-
.../plan/planner/MemoryDistributionCalculator.java | 7 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 84 ++++++-
.../planner/distribution/DistributionPlanner.java | 4 +-
.../planner/distribution/ExchangeNodeAdder.java | 24 ++
.../SimpleFragmentParallelPlanner.java | 27 ++-
.../plan/planner/distribution/SourceRewriter.java | 2 +-
.../db/mpp/plan/planner/plan/PlanFragment.java | 24 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 10 +-
.../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/planner/plan/node/process/SortNode.java | 54 ++---
.../SortNode.java => source/ShowQueriesNode.java} | 82 +++----
.../plan/node/source/VirtualSourceNode.java | 41 ++++
.../planner/plan/parameter/OrderByParameter.java | 10 +
.../plan/statement/sys/ShowQueriesStatement.java | 25 +++
.../execution/operator/MergeSortOperatorTest.java | 243 ++++++++++++++++++++-
.../mpp/execution/operator/OperatorMemoryTest.java | 19 +-
.../plan/plan/node/process/SortNodeSerdeTest.java | 10 +-
30 files changed, 1000 insertions(+), 131 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 5fa693b2a8..16c190a25e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -18,55 +18,120 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
public class SortOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+ private final Operator inputOperator;
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private List<MergeSortKey> cachedData;
+ private final Comparator<MergeSortKey> comparator;
+
+ public SortOperator(
+ OperatorContext operatorContext,
+ Operator inputOperator,
+ List<TSDataType> dataTypes,
+ Comparator<MergeSortKey> comparator) {
+ this.operatorContext = operatorContext;
+ this.inputOperator = inputOperator;
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.cachedData = new ArrayList<>();
+ this.comparator = comparator;
+ }
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public ListenableFuture<?> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ return inputOperator.isBlocked();
}
@Override
public TsBlock next() {
- return null;
+ TsBlock tsBlock = inputOperator.nextWithTimer();
+ if (tsBlock == null) {
+ return null;
+ }
+ // add data of each TsBlock from child into list
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ cachedData.add(new MergeSortKey(tsBlock, i));
+ }
+ // child has more data, can't calculate
+ if (inputOperator.hasNextWithTimer()) {
+ return null;
+ }
+
+ if (cachedData.size() > 1) {
+ cachedData.sort(comparator);
+ }
+ TsBlock result = buildTsBlock();
+ cachedData = null;
+ return result;
+ }
+
+ private TsBlock buildTsBlock() {
+ TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ cachedData.forEach(
+ mergeSortKey -> {
+ TsBlock tsBlock = mergeSortKey.tsBlock;
+ int row = mergeSortKey.rowIndex;
+ timeColumnBuilder.writeLong(tsBlock.getTimeByIndex(row));
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ valueColumnBuilders[i].write(tsBlock.getColumn(i), row);
+ }
+ tsBlockBuilder.declarePosition();
+ });
+ return tsBlockBuilder.build();
}
@Override
public boolean hasNext() {
- return false;
+ return inputOperator.hasNextWithTimer();
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ inputOperator.close();
}
@Override
public boolean isFinished() {
- return false;
+ return cachedData == null;
}
@Override
public long calculateMaxPeekMemory() {
- return 0;
+ // In fact, we need to cache all data from input.
+ // Now the child of this Operator only will be ShowQueries, it only returns one Block.
+ return inputOperator.calculateMaxPeekMemory()
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
}
@Override
public long calculateMaxReturnSize() {
- return 0;
+ return inputOperator.calculateMaxReturnSize();
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return 0;
+ return inputOperator.calculateRetainedSizeAfterCallingNext();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
index da17deed4e..b25add1a25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -23,7 +23,11 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.commons.collections4.comparators.ComparatorChain;
+
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -123,23 +127,99 @@ public class MergeSortComparator {
: deviceComparing;
};
- public static Comparator<MergeSortKey> getComparator(List<SortItem> sortItemList) {
- if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
- if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
- if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
- else return ASC_DEVICE_ASC_TIME;
- } else {
- if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
- else return ASC_DEVICE_DESC_TIME;
- }
- } else {
- if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
- if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
- else return DESC_DEVICE_ASC_TIME;
+ public static Comparator<MergeSortKey> getComparator(
+ List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> dataTypeList) {
+ // specified for order by time, device or order by device, time
+ if (sortItemList.size() == 2
+ && ((sortItemList.get(0).getSortKey() == SortKey.TIME
+ && sortItemList.get(1).getSortKey() == SortKey.DEVICE)
+ || (sortItemList.get(0).getSortKey() == SortKey.DEVICE
+ && sortItemList.get(1).getSortKey() == SortKey.TIME))) {
+ if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
+ else return ASC_DEVICE_ASC_TIME;
+ } else {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
+ else return ASC_DEVICE_DESC_TIME;
+ }
} else {
- if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
- else return DESC_DEVICE_DESC_TIME;
+ if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
+ else return DESC_DEVICE_ASC_TIME;
+ } else {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
+ else return DESC_DEVICE_DESC_TIME;
+ }
}
+ } else { // generally order by
+ // use code-gen compile this comparator
+
+ // currently only show queries use merge sort and will only contain TIME, QUERYID, DATANODEID,
+ // ELAPSEDTIME, STATEMENT
+ return genComparatorChain(sortItemList, indexList, dataTypeList);
+ }
+ }
+
+ /** @param indexList -1 for time column */
+ private static ComparatorChain<MergeSortKey> genComparatorChain(
+ List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> dataTypeList) {
+ List<Comparator<MergeSortKey>> list = new ArrayList<>(indexList.size());
+ for (int i = 0; i < indexList.size(); i++) {
+ int index = indexList.get(i);
+ TSDataType dataType = dataTypeList.get(i);
+ boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
+ list.add(genSingleComparator(asc, index, dataType));
+ }
+ return new ComparatorChain<>(list);
+ }
+
+ private static Comparator<MergeSortKey> genSingleComparator(
+ boolean asc, int index, TSDataType dataType) {
+ Comparator<MergeSortKey> comparator;
+ switch (dataType) {
+ case INT32:
+ comparator =
+ Comparator.comparingInt(
+ (MergeSortKey sortKey) ->
+ sortKey.tsBlock.getColumn(index).getInt(sortKey.rowIndex));
+ break;
+ case INT64:
+ if (index == -1) {
+ comparator =
+ Comparator.comparingLong(
+ (MergeSortKey sortKey) -> sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
+ } else {
+ comparator =
+ Comparator.comparingLong(
+ (MergeSortKey sortKey) ->
+ sortKey.tsBlock.getColumn(index).getLong(sortKey.rowIndex));
+ }
+ break;
+ case FLOAT:
+ comparator =
+ Comparator.comparingDouble(
+ (MergeSortKey sortKey) ->
+ sortKey.tsBlock.getColumn(index).getFloat(sortKey.rowIndex));
+ break;
+ case DOUBLE:
+ comparator =
+ Comparator.comparingDouble(
+ (MergeSortKey sortKey) ->
+ sortKey.tsBlock.getColumn(index).getDouble(sortKey.rowIndex));
+ break;
+ case TEXT:
+ comparator =
+ Comparator.comparing(
+ (MergeSortKey sortKey) ->
+ sortKey.tsBlock.getColumn(index).getBinary(sortKey.rowIndex));
+ break;
+ default:
+ throw new IllegalArgumentException("Data type: " + dataType + " cannot be ordered");
+ }
+ if (!asc) {
+ comparator = comparator.reversed();
}
+ return comparator;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
new file mode 100644
index 0000000000..0a9c86d0c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class ShowQueriesOperator implements SourceOperator {
+
+ private final OperatorContext operatorContext;
+
+ private final PlanNodeId sourceId;
+
+ private TsBlock tsBlock;
+ private boolean hasConsumed;
+
+ private final Coordinator coordinator;
+
+ public ShowQueriesOperator(
+ OperatorContext operatorContext, PlanNodeId sourceId, Coordinator coordinator) {
+ this.operatorContext = operatorContext;
+ this.sourceId = sourceId;
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = tsBlock;
+ hasConsumed = true;
+ tsBlock = null;
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (hasConsumed) {
+ return false;
+ }
+ if (tsBlock == null) {
+ tsBlock = buildTsBlock();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return hasConsumed;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0L;
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ private TsBlock buildTsBlock() {
+ List<TSDataType> outputDataTypes =
+ DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes();
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ List<IQueryExecution> queryExecutions = coordinator.getAllQueryExecutions();
+
+ if (!queryExecutions.isEmpty()) {
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+ long currTime = System.currentTimeMillis();
+ String[] splits = queryExecutions.get(0).getQueryId().split("_");
+ int DataNodeId = Integer.parseInt(splits[splits.length - 1]);
+
+ for (IQueryExecution queryExecution : queryExecutions) {
+ timeColumnBuilder.writeLong(queryExecution.getStartExecutionTime());
+ columnBuilders[0].writeBinary(Binary.valueOf(queryExecution.getQueryId()));
+ columnBuilders[1].writeInt(DataNodeId);
+ columnBuilders[2].writeFloat(
+ (float) (currTime - queryExecution.getStartExecutionTime()) / 1000);
+ columnBuilders[3].writeBinary(
+ Binary.valueOf(queryExecution.getExecuteSQL().orElse("UNKNOWN")));
+ builder.declarePosition();
+ }
+ }
+ return builder.build();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index e2bb953f26..3fb52f929b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -169,6 +171,10 @@ public class Coordinator {
return queryExecutionMap.get(queryId);
}
+ public List<IQueryExecution> getAllQueryExecutions() {
+ return new ArrayList<>(queryExecutionMap.values());
+ }
+
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 58c362f38a..92f9018ca5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -163,6 +163,9 @@ public class Analysis {
// header of result dataset
private DatasetHeader respDatasetHeader;
+ // indicate whether the Nodes produce source data are VirtualSourceNodes
+ private boolean isVirtualSource = false;
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// SELECT INTO Analysis
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -522,4 +525,12 @@ public class Analysis {
public void setRunningDataNodeLocations(List<TDataNodeLocation> runningDataNodeLocations) {
this.runningDataNodeLocations = runningDataNodeLocations;
}
+
+ public boolean isVirtualSource() {
+ return isVirtualSource;
+ }
+
+ public void setVirtualSource(boolean virtualSource) {
+ isVirtualSource = virtualSource;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index d4514bb892..accb761aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -2516,16 +2516,38 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis = new Analysis();
analysis.setStatement(showQueriesStatement);
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowQueriesHeader());
+ analysis.setVirtualSource(true);
List<TDataNodeLocation> allRunningDataNodeLocations = getRunningDataNodeLocations();
if (allRunningDataNodeLocations.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
}
// TODO Constant folding optimization for Where Predicate after True/False Constant introduced
+ if (allRunningDataNodeLocations.isEmpty()) {
+ throw new StatementAnalyzeException("no Running DataNodes");
+ }
analysis.setRunningDataNodeLocations(allRunningDataNodeLocations);
+ Set<Expression> sourceExpressions =
+ analysis.getRespDatasetHeader().getColumnHeaders().stream()
+ .map(
+ columnHeader -> {
+ try {
+ return new TimeSeriesOperand(
+ new MeasurementPath(
+ columnHeader.getColumnName(), columnHeader.getColumnType()));
+ } catch (IllegalPathException ignored) {
+ }
+ return null;
+ })
+ .collect(Collectors.toSet());
+ analysis.setSourceExpressions(sourceExpressions);
+ sourceExpressions.forEach(expression -> analyzeExpression(analysis, expression));
+
analyzeWhere(analysis, showQueriesStatement);
+ analysis.setMergeOrderParameter(new OrderByParameter(showQueriesStatement.getSortItemList()));
+
return analysis;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 86e10f6a47..1d9f464667 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -51,6 +51,8 @@ public interface IQueryExecution {
String getQueryId();
+ long getStartExecutionTime();
+
void recordExecutionTime(long executionTime);
long getTotalExecutionTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index bda1af235e..7ed5f82aae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -629,6 +629,11 @@ public class QueryExecution implements IQueryExecution {
return context.getQueryId().getId();
}
+ @Override
+ public long getStartExecutionTime() {
+ return context.getStartTime();
+ }
+
@Override
public void recordExecutionTime(long executionTime) {
totalExecutionTime += executionTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 29dc1b8aeb..8762cd748e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -219,6 +219,11 @@ public class ConfigExecution implements IQueryExecution {
return context.getQueryId().getId();
}
+ @Override
+ public long getStartExecutionTime() {
+ return context.getStartTime();
+ }
+
@Override
public void recordExecutionTime(long executionTime) {
totalExecutionTime += executionTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 385473deb6..9d868a23a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -2529,6 +2529,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
parseOffsetClause(ctx.rowPaginationClause().offsetClause()));
}
}
+
+ showQueriesStatement.setZoneId(zoneId);
return showQueriesStatement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 744d3aceef..ae52725c12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
@@ -64,6 +65,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -74,6 +76,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationType;
@@ -86,6 +89,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -1111,4 +1115,62 @@ public class LogicalPlanBuilder {
context.getQueryId().genPlanNodeId(), pathPatternList, templateId);
return this;
}
+
+ private LogicalPlanBuilder planSort(OrderByParameter orderByParameter) {
+ if (orderByParameter.isEmpty()) {
+ return this;
+ }
+ this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, orderByParameter);
+ return this;
+ }
+
+ public LogicalPlanBuilder planShowQueries(Analysis analysis, ShowQueriesStatement statement) {
+ List<TDataNodeLocation> dataNodeLocations = analysis.getRunningDataNodeLocations();
+ if (dataNodeLocations.size() == 1) {
+ this.root =
+ planSingleShowQueries(dataNodeLocations.get(0))
+ .planFilterAndTransform(
+ analysis.getWhereExpression(),
+ analysis.getSourceExpressions(),
+ false,
+ statement.getZoneId(),
+ Ordering.ASC)
+ .planSort(analysis.getMergeOrderParameter())
+ .getRoot();
+ } else {
+ List<String> outputColumns = new ArrayList<>();
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.getQueryId().genPlanNodeId(),
+ analysis.getMergeOrderParameter(),
+ outputColumns);
+
+ dataNodeLocations.forEach(
+ dataNodeLocation ->
+ mergeSortNode.addChild(
+ this.planSingleShowQueries(dataNodeLocation)
+ .planFilterAndTransform(
+ analysis.getWhereExpression(),
+ analysis.getSourceExpressions(),
+ false,
+ statement.getZoneId(),
+ Ordering.ASC)
+ .planSort(analysis.getMergeOrderParameter())
+ .getRoot()));
+ outputColumns.addAll(mergeSortNode.getChildren().get(0).getOutputColumnNames());
+ this.root = mergeSortNode;
+ }
+
+ ColumnHeaderConstant.showQueriesColumnHeaders.forEach(
+ columnHeader ->
+ context
+ .getTypeProvider()
+ .setType(columnHeader.getColumnName(), columnHeader.getColumnType()));
+ return this;
+ }
+
+ private LogicalPlanBuilder planSingleShowQueries(TDataNodeLocation dataNodeLocation) {
+ this.root = new ShowQueriesNode(context.getQueryId().genPlanNodeId(), dataNodeLocation);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 7dd495ab78..1b78534118 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -712,7 +712,11 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
public PlanNode visitShowQueries(
ShowQueriesStatement showQueriesStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
- // TODO planBuilder = planBuilder.planShowQueries()
+ planBuilder =
+ planBuilder
+ .planShowQueries(analysis, showQueriesStatement) // push Filter down
+ .planOffset(showQueriesStatement.getRowOffset())
+ .planLimit(showQueriesStatement.getRowLimit());
return planBuilder.getRoot();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
index bd187f74f4..e257554300 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/MemoryDistributionCalculator.java
@@ -69,6 +69,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
@@ -424,6 +425,12 @@ public class MemoryDistributionCalculator
return null;
}
+ @Override
+ public Void visitShowQueries(ShowQueriesNode node, MemoryDistributionContext context) {
+ // do nothing since VirtualSourceNode will not have Exchange/FragmentSink as child
+ return null;
+ }
+
enum MemoryDistributionType {
/**
* This type means that this node needs data from all the children. For example, TimeJoinNode.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 75f07c6fc2..b0be9ca0d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
@@ -115,6 +116,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
@@ -168,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
@@ -752,8 +756,43 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+ List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
+ genSortInformation(
+ node.getOutputColumnNames(),
+ dataTypes,
+ sortItemList,
+ sortItemIndexList,
+ sortItemDataTypeList);
return new MergeSortOperator(
- operatorContext, children, dataTypes, MergeSortComparator.getComparator(sortItemList));
+ operatorContext,
+ children,
+ dataTypes,
+ MergeSortComparator.getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
+ }
+
+ private void genSortInformation(
+ List<String> outputColumnNames,
+ List<TSDataType> dataTypes,
+ List<SortItem> sortItemList,
+ List<Integer> sortItemIndexList,
+ List<TSDataType> sortItemDataTypeList) {
+ sortItemList.forEach(
+ sortItem -> {
+ if (sortItem.getSortKey() == SortKey.TIME) {
+ sortItemIndexList.add(-1);
+ sortItemDataTypeList.add(TSDataType.INT64);
+ } else {
+ for (int i = 0; i < outputColumnNames.size(); i++) {
+ if (sortItem.getSortKey().toString().equalsIgnoreCase(outputColumnNames.get(i))) {
+ sortItemIndexList.add(i);
+ sortItemDataTypeList.add(dataTypes.get(i));
+ break;
+ }
+ }
+ }
+ });
}
@Override
@@ -1405,7 +1444,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
- return super.visitSort(node, context);
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ DeviceViewIntoOperator.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+ List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList();
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+ List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size());
+ genSortInformation(
+ node.getOutputColumnNames(),
+ dataTypes,
+ sortItemList,
+ sortItemIndexList,
+ sortItemDataTypeList);
+ return new SortOperator(
+ operatorContext,
+ child,
+ dataTypes,
+ MergeSortComparator.getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
}
@Override
@@ -1621,6 +1685,22 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new VerticallyConcatOperator(operatorContext, children, outputColumnTypes);
}
+ @Override
+ public Operator visitShowQueries(ShowQueriesNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ ShowQueriesOperator.class.getSimpleName());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+
+ return new ShowQueriesOperator(
+ operatorContext, node.getPlanNodeId(), Coordinator.getInstance());
+ }
+
private List<OutputColumn> generateOutputColumnsFromChildren(MultiChildProcessNode node) {
// TODO we should also sort the InputLocation for each column if they are not overlapped
return makeLayout(node).values().stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 965dcb1b56..0c564a3fa1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import java.util.List;
@@ -73,7 +74,8 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
- if (analysis.getStatement() instanceof QueryStatement) {
+ if (analysis.getStatement() instanceof QueryStatement
+ || analysis.getStatement() instanceof ShowQueriesStatement) {
analysis
.getRespDatasetHeader()
.setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index cc24dd0c97..2d48a9d0fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -217,6 +217,9 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
@Override
public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) {
+ if (analysis.isVirtualSource()) {
+ return processMultiChildNodeByLocation(node, context);
+ }
// 1. Group children by dataRegion
Map<TRegionReplicaSet, List<PlanNode>> childrenGroupMap = new HashMap<>();
for (int i = 0; i < node.getChildren().size(); i++) {
@@ -412,6 +415,10 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
}
private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) {
+ if (analysis.isVirtualSource()) {
+ return processMultiChildNodeByLocation(node, context);
+ }
+
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
node.getChildren()
@@ -455,6 +462,23 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ private PlanNode processMultiChildNodeByLocation(
+ MultiChildProcessNode node, NodeGroupContext context) {
+ MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
+
+ List<PlanNode> children = node.getChildren();
+ newNode.addChild(children.get(0));
+ for (int i = 1; i < children.size(); i++) {
+ PlanNode child = children.get(i);
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(child);
+ exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+ newNode.addChild(exchangeNode);
+ }
+ return newNode;
+ }
+
@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 96719c60cc..2ee598e597 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
@@ -114,21 +115,27 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- if ((analysis.getDataPartitionInfo() == null || analysis.getDataPartitionInfo().isEmpty())
- && (analysis.getStatement() instanceof QueryStatement
- && ((QueryStatement) analysis.getStatement()).isAggregationQuery())) {
- // AggregationQuery && no data region, we need to execute this FI on local
- fragmentInstance.setExecutorAndHost(
- new QueryExecutor(
- new TDataNodeLocation()
- .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT)
- .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT)));
+ if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) {
+ TDataNodeLocation dataNodeLocation = fragment.getTargetLocation();
+ if (dataNodeLocation != null) {
+ // now only the case ShowQueries will enter here
+ fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation));
+ } else {
+ // no data region && no dataNodeLocation, we need to execute this FI on local
+ // now only the case AggregationQuery has schema but no data region will enter here
+ fragmentInstance.setExecutorAndHost(
+ new QueryExecutor(
+ new TDataNodeLocation()
+ .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT)
+ .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT)));
+ }
} else {
fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
}
- if (analysis.getStatement() instanceof QueryStatement) {
+ if (analysis.getStatement() instanceof QueryStatement
+ || analysis.getStatement() instanceof ShowQueriesStatement) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 20d703a945..80e446cf18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -79,7 +79,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC
public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
- private Analysis analysis;
+ private final Analysis analysis;
public SourceRewriter(Analysis analysis) {
this.analysis = analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index c21cfa28fc..aac15e7940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.plan;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -89,7 +91,7 @@ public class PlanFragment {
return String.format("PlanFragment-%s", getId());
}
- // Every Fragment should only run in DataRegion.
+ // Every Fragment related with DataPartition should only run in one DataRegion.
// But it can select any one of the Endpoint of the target DataRegion
// In current version, one PlanFragment should contain at least one SourceNode,
// and the DataRegions of all SourceNodes should be same in one PlanFragment.
@@ -98,6 +100,13 @@ public class PlanFragment {
return getNodeRegion(planNodeTree);
}
+ // If a Fragment is not related with DataPartition,
+ // it may be related with a specific DataNode.
+ // This method return the DataNodeLocation will offer execution of this Fragment.
+ public TDataNodeLocation getTargetLocation() {
+ return getNodeLocation(planNodeTree);
+ }
+
private TRegionReplicaSet getNodeRegion(PlanNode root) {
if (root instanceof IPartitionRelatedNode) {
return ((IPartitionRelatedNode) root).getRegionReplicaSet();
@@ -111,6 +120,19 @@ public class PlanFragment {
return null;
}
+ private TDataNodeLocation getNodeLocation(PlanNode root) {
+ if (root instanceof VirtualSourceNode) {
+ return ((VirtualSourceNode) root).getDataNodeLocation();
+ }
+ for (PlanNode child : root.getChildren()) {
+ TDataNodeLocation result = getNodeLocation(child);
+ if (result != null) {
+ return result;
+ }
+ }
+ return null;
+ }
+
public PlanNode getPlanNodeById(PlanNodeId nodeId) {
return getPlanNodeById(planNodeTree, nodeId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 58bbe7b371..dfdaa9e77b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -56,7 +56,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
-import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.Validate;
@@ -169,12 +168,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("MergeSort-%s", node.getPlanNodeId().getId()));
boxValue.add(String.format("ChildrenCount: %d", node.getChildren().size()));
- StringBuilder sortInfo = new StringBuilder("Order:");
- for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
- sortInfo.append(" ");
- sortInfo.append(sortItem.getSortKey()).append(" ").append(sortItem.getOrdering());
- }
- boxValue.add(sortInfo.toString());
+ boxValue.add(node.getMergeOrderParameter().toString());
return render(node, boxValue, context);
}
@@ -284,7 +278,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
public List<String> visitSort(SortNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("Sort-%s", node.getPlanNodeId().getId()));
- boxValue.add(String.format("OrderBy: %s", node.getSortOrder()));
+ boxValue.add(node.getOrderByParameter().toString());
return render(node, boxValue, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 70999dc816..a289b1145b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -156,7 +157,8 @@ public enum PlanNodeType {
DEVICE_VIEW_INTO((short) 63),
VERTICALLY_CONCAT((short) 64),
SINGLE_DEVICE_VIEW((short) 65),
- MERGE_SORT((short) 66);
+ MERGE_SORT((short) 66),
+ SHOW_QUERIES((short) 67);
public static final int BYTES = Short.BYTES;
@@ -339,6 +341,8 @@ public enum PlanNodeType {
return SingleDeviceViewNode.deserialize(buffer);
case 66:
return MergeSortNode.deserialize(buffer);
+ case 67:
+ return ShowQueriesNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index f7eb7d5704..96422f5074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -341,4 +342,8 @@ public abstract class PlanVisitor<R, C> {
public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitShowQueries(ShowQueriesNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
index e033fd163d..f61b34e842 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
@@ -22,40 +22,36 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+
+import com.google.common.base.Objects;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Objects;
-/**
- * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortNode should not appear.
- */
public class SortNode extends SingleChildProcessNode {
- private final Ordering sortOrder;
+ private final OrderByParameter orderByParameter;
- public SortNode(PlanNodeId id, Ordering sortOrder) {
- super(id);
- this.sortOrder = sortOrder;
+ public SortNode(PlanNodeId id, PlanNode child, OrderByParameter orderByParameter) {
+ super(id, child);
+ this.orderByParameter = orderByParameter;
}
- public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
- super(id, child);
- this.sortOrder = sortOrder;
+ public SortNode(PlanNodeId id, OrderByParameter orderByParameter) {
+ super(id);
+ this.orderByParameter = orderByParameter;
}
- public Ordering getSortOrder() {
- return sortOrder;
+ public OrderByParameter getOrderByParameter() {
+ return orderByParameter;
}
@Override
public PlanNode clone() {
- return new SortNode(getPlanNodeId(), sortOrder);
+ return new SortNode(getPlanNodeId(), child, orderByParameter);
}
@Override
@@ -71,38 +67,32 @@ public class SortNode extends SingleChildProcessNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.SORT.serialize(byteBuffer);
- ReadWriteIOUtils.write(sortOrder.ordinal(), byteBuffer);
+ orderByParameter.serializeAttributes(byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
PlanNodeType.SORT.serialize(stream);
- ReadWriteIOUtils.write(sortOrder.ordinal(), stream);
+ orderByParameter.serializeAttributes(stream);
}
public static SortNode deserialize(ByteBuffer byteBuffer) {
- Ordering orderBy = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ OrderByParameter orderByParameter = OrderByParameter.deserialize(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new SortNode(planNodeId, orderBy);
+ return new SortNode(planNodeId, orderByParameter);
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
SortNode sortNode = (SortNode) o;
- return sortOrder == sortNode.sortOrder;
+ return Objects.equal(orderByParameter, sortNode.orderByParameter);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), sortOrder);
+ return Objects.hashCode(super.hashCode(), orderByParameter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
index e033fd163d..8ecdd89608 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,14 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -31,78 +33,78 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-/**
- * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortNode should not appear.
- */
-public class SortNode extends SingleChildProcessNode {
+public class ShowQueriesNode extends VirtualSourceNode {
- private final Ordering sortOrder;
+ public static final List<String> SHOW_QUERIES_HEADER_COLUMNS =
+ ImmutableList.of(
+ ColumnHeaderConstant.QUERY_ID,
+ ColumnHeaderConstant.DATA_NODE_ID,
+ ColumnHeaderConstant.ELAPSED_TIME,
+ ColumnHeaderConstant.STATEMENT);
- public SortNode(PlanNodeId id, Ordering sortOrder) {
- super(id);
- this.sortOrder = sortOrder;
+ public ShowQueriesNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) {
+ super(id, dataNodeLocation);
}
- public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
- super(id, child);
- this.sortOrder = sortOrder;
+ @Override
+ public List<PlanNode> getChildren() {
+ return ImmutableList.of();
}
- public Ordering getSortOrder() {
- return sortOrder;
+ @Override
+ public void addChild(PlanNode child) {
+ throw new UnsupportedOperationException("no child is allowed for ShowQueriesNode");
}
@Override
public PlanNode clone() {
- return new SortNode(getPlanNodeId(), sortOrder);
+ return new ShowQueriesNode(getPlanNodeId(), getDataNodeLocation());
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
}
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return SHOW_QUERIES_HEADER_COLUMNS;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitSort(this, context);
+ return visitor.visitShowQueries(this, context);
}
+ // We only use DataNodeLocation when do distributionPlan, so DataNodeLocation is no need to
+ // serialize
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.SORT.serialize(byteBuffer);
- ReadWriteIOUtils.write(sortOrder.ordinal(), byteBuffer);
+ PlanNodeType.SHOW_QUERIES.serialize(byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
- PlanNodeType.SORT.serialize(stream);
- ReadWriteIOUtils.write(sortOrder.ordinal(), stream);
+ PlanNodeType.SHOW_QUERIES.serialize(stream);
}
- public static SortNode deserialize(ByteBuffer byteBuffer) {
- Ordering orderBy = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ public static ShowQueriesNode deserialize(ByteBuffer byteBuffer) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new SortNode(planNodeId, orderBy);
+ return new ShowQueriesNode(planNodeId, null);
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- SortNode sortNode = (SortNode) o;
- return sortOrder == sortNode.sortOrder;
+ return super.equals(o);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), sortOrder);
+ return Objects.hash(super.hashCode());
+ }
+
+ @Override
+ public String toString() {
+ return "ShowQueriesNode-" + this.getPlanNodeId();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java
new file mode 100644
index 0000000000..b012826385
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+public abstract class VirtualSourceNode extends PlanNode {
+
+ private TDataNodeLocation dataNodeLocation;
+
+ public VirtualSourceNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) {
+ super(id);
+ this.dataNodeLocation = dataNodeLocation;
+ }
+
+ public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
+ this.dataNodeLocation = dataNodeLocation;
+ }
+
+ public TDataNodeLocation getDataNodeLocation() {
+ return dataNodeLocation;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
index 6310eb7942..2adf7b1bc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OrderByParameter.java
@@ -50,6 +50,16 @@ public class OrderByParameter {
return sortItemList.isEmpty();
}
+ @Override
+ public String toString() {
+ StringBuilder sortInfo = new StringBuilder("OrderBy:");
+ for (SortItem sortItem : sortItemList) {
+ sortInfo.append(" ");
+ sortInfo.append(sortItem.toSQLString());
+ }
+ return sortInfo.toString();
+ }
+
public void serializeAttributes(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(sortItemList.size(), byteBuffer);
for (SortItem sortItem : sortItemList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
index 33d492b1b1..1b3439519d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -21,9 +21,16 @@ package org.apache.iotdb.db.mpp.plan.statement.sys;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStatement;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+
public class ShowQueriesStatement extends ShowStatement {
private WhereCondition whereCondition;
@@ -33,6 +40,8 @@ public class ShowQueriesStatement extends ShowStatement {
private int rowLimit;
private int rowOffset;
+ private ZoneId zoneId;
+
public ShowQueriesStatement() {}
@Override
@@ -56,6 +65,14 @@ public class ShowQueriesStatement extends ShowStatement {
return orderByComponent;
}
+ public List<SortItem> getSortItemList() {
+ if (orderByComponent == null) {
+ // default order
+ return Collections.singletonList(new SortItem(SortKey.TIME, Ordering.ASC));
+ }
+ return orderByComponent.getSortItemList();
+ }
+
public void setRowLimit(int rowLimit) {
this.rowLimit = rowLimit;
}
@@ -71,4 +88,12 @@ public class ShowQueriesStatement extends ShowStatement {
public int getRowOffset() {
return rowOffset;
}
+
+ public ZoneId getZoneId() {
+ return zoneId;
+ }
+
+ public void setZoneId(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index cd5831502e..9a59a462e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -20,30 +20,41 @@ package org.apache.iotdb.db.mpp.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -53,14 +64,19 @@ import io.airlift.units.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -83,8 +99,12 @@ public class MergeSortOperatorTest {
private static final String DEVICE2 = MERGE_SORT_OPERATOR_TEST_SG + ".device2";
private static final String DEVICE3 = MERGE_SORT_OPERATOR_TEST_SG + ".device3";
+ private int dataNodeId;
+
@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
+ dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources, MERGE_SORT_OPERATOR_TEST_SG);
}
@@ -92,6 +112,7 @@ public class MergeSortOperatorTest {
@After
public void tearDown() throws IOException {
SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
}
long getValue(long expectedTime) {
@@ -317,7 +338,9 @@ public class MergeSortOperatorTest {
MergeSortComparator.getComparator(
Arrays.asList(
new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering))));
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -791,7 +814,9 @@ public class MergeSortOperatorTest {
MergeSortComparator.getComparator(
Arrays.asList(
new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering))));
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
MergeSortOperator mergeSortOperator2 =
new MergeSortOperator(
fragmentInstanceContext.getOperatorContexts().get(15),
@@ -800,7 +825,9 @@ public class MergeSortOperatorTest {
MergeSortComparator.getComparator(
Arrays.asList(
new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering))));
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
return new MergeSortOperator(
fragmentInstanceContext.getOperatorContexts().get(16),
@@ -809,7 +836,9 @@ public class MergeSortOperatorTest {
MergeSortComparator.getComparator(
Arrays.asList(
new SortItem(SortKey.TIME, timeOrdering),
- new SortItem(SortKey.DEVICE, deviceOrdering))));
+ new SortItem(SortKey.DEVICE, deviceOrdering)),
+ null,
+ null));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -1257,7 +1286,9 @@ public class MergeSortOperatorTest {
MergeSortComparator.getComparator(
Arrays.asList(
new SortItem(SortKey.DEVICE, deviceOrdering),
- new SortItem(SortKey.TIME, timeOrdering))));
+ new SortItem(SortKey.TIME, timeOrdering)),
+ null,
+ null));
} catch (IllegalPathException e) {
e.printStackTrace();
fail();
@@ -1472,4 +1503,206 @@ public class MergeSortOperatorTest {
}
assertEquals(count, 2000);
}
+
+ // ------------------------------------------------------------------------------------------------
+ // order by Time, DataNodeID
+ // ------------------------------------------------------------------------------------------------
+ //
+ // MergeSortOperator
+ // ___________|__________
+ // / \
+ // SortOperator SortOperator
+ // | |
+ // ShowQueriesOperator ShowQueriesOperator
+ // ------------------------------------------------------------------------------------------------
+ @Test
+ public void mergeSortWithSortOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId0 = new PlanNodeId("0");
+ fragmentInstanceContext.addOperatorContext(
+ 0, planNodeId0, ShowQueriesOperator.class.getSimpleName());
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, ShowQueriesOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SortOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SortOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, MergeSortOperator.class.getSimpleName());
+
+ List<OperatorContext> operatorContexts = fragmentInstanceContext.getOperatorContexts();
+ List<TSDataType> dataTypes = DatasetHeaderFactory.getShowQueriesHeader().getRespDataTypes();
+ Comparator<MergeSortKey> comparator =
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, Ordering.ASC),
+ new SortItem(SortKey.DATANODEID, Ordering.DESC)),
+ ImmutableList.of(-1, 1),
+ ImmutableList.of(TSDataType.INT64, TSDataType.INT32));
+
+ Coordinator coordinator1 = Mockito.mock(Coordinator.class);
+ Mockito.when(coordinator1.getAllQueryExecutions())
+ .thenReturn(
+ ImmutableList.of(
+ new FakeQueryExecution(3, "20221229_000000_00003_1", "sql3_node1"),
+ new FakeQueryExecution(1, "20221229_000000_00001_1", "sql1_node1"),
+ new FakeQueryExecution(2, "20221229_000000_00002_1", "sql2_node1")));
+ Coordinator coordinator2 = Mockito.mock(Coordinator.class);
+ Mockito.when(coordinator2.getAllQueryExecutions())
+ .thenReturn(
+ ImmutableList.of(
+ new FakeQueryExecution(3, "20221229_000000_00003_2", "sql3_node2"),
+ new FakeQueryExecution(2, "20221229_000000_00002_2", "sql2_node2"),
+ new FakeQueryExecution(1, "20221229_000000_00001_2", "sql1_node2")));
+
+ ShowQueriesOperator showQueriesOperator1 =
+ new ShowQueriesOperator(operatorContexts.get(0), planNodeId0, coordinator1);
+ ShowQueriesOperator showQueriesOperator2 =
+ new ShowQueriesOperator(operatorContexts.get(1), planNodeId1, coordinator2);
+ SortOperator sortOperator1 =
+ new SortOperator(operatorContexts.get(2), showQueriesOperator1, dataTypes, comparator);
+ SortOperator sortOperator2 =
+ new SortOperator(operatorContexts.get(3), showQueriesOperator2, dataTypes, comparator);
+ Operator root =
+ new MergeSortOperator(
+ operatorContexts.get(4),
+ ImmutableList.of(sortOperator1, sortOperator2),
+ dataTypes,
+ comparator);
+
+ int index = 0;
+ // Time ASC
+ long[] expectedTime = new long[] {1, 1, 2, 2, 3, 3};
+ // DataNodeId DESC if Times are equal
+ String[] expectedQueryId =
+ new String[] {
+ "20221229_000000_00001_2",
+ "20221229_000000_00001_1",
+ "20221229_000000_00002_2",
+ "20221229_000000_00002_1",
+ "20221229_000000_00003_2",
+ "20221229_000000_00003_1"
+ };
+ int[] expectedDataNodeId = new int[] {2, 1, 2, 1, 2, 1};
+ String[] expectedStatement =
+ new String[] {
+ "sql1_node2", "sql1_node1", "sql2_node2", "sql2_node1", "sql3_node2", "sql3_node1"
+ };
+ while (root.hasNext()) {
+ TsBlock result = root.next();
+ if (result == null) {
+ continue;
+ }
+
+ for (int i = 0; i < result.getPositionCount(); i++, index++) {
+ assertEquals(expectedTime[index], result.getTimeColumn().getLong(i));
+ assertEquals(expectedQueryId[index], result.getColumn(0).getBinary(i).toString());
+ assertEquals(expectedDataNodeId[index], result.getColumn(1).getInt(i));
+ assertEquals(expectedStatement[index], result.getColumn(3).getBinary(i).toString());
+ }
+ }
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ static class FakeQueryExecution implements IQueryExecution {
+ private final long startTime;
+ private final String queryId;
+ private final String sql;
+
+ FakeQueryExecution(long startTime, String queryId, String sql) {
+ this.startTime = startTime;
+ this.queryId = queryId;
+ this.sql = sql;
+ }
+
+ @Override
+ public String getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public long getStartExecutionTime() {
+ return startTime;
+ }
+
+ @Override
+ public void recordExecutionTime(long executionTime) {}
+
+ @Override
+ public long getTotalExecutionTime() {
+ return 0;
+ }
+
+ @Override
+ public Optional<String> getExecuteSQL() {
+ return Optional.of(sql);
+ }
+
+ @Override
+ public Statement getStatement() {
+ return null;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void stopAndCleanup() {}
+
+ @Override
+ public ExecutionResult getStatus() {
+ return null;
+ }
+
+ @Override
+ public Optional<TsBlock> getBatchResult() throws IoTDBException {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return false;
+ }
+
+ @Override
+ public int getOutputValueColumnCount() {
+ return 0;
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ return null;
+ }
+
+ @Override
+ public boolean isQuery() {
+ return false;
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index 69d6b2382c..8d57229b44 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -445,10 +445,21 @@ public class OperatorMemoryTest {
@Test
public void sortOperatorTest() {
- SortOperator sortOperator = new SortOperator();
- assertEquals(0, sortOperator.calculateMaxPeekMemory());
- assertEquals(0, sortOperator.calculateMaxReturnSize());
- assertEquals(0, sortOperator.calculateRetainedSizeAfterCallingNext());
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L);
+ Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L);
+
+ SortOperator sortOperator =
+ new SortOperator(
+ Mockito.mock(OperatorContext.class),
+ child,
+ Collections.singletonList(TSDataType.INT32),
+ null);
+
+ assertEquals(2048 + 512, sortOperator.calculateMaxPeekMemory());
+ assertEquals(1024, sortOperator.calculateMaxReturnSize());
+ assertEquals(512, sortOperator.calculateRetainedSizeAfterCallingNext());
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
index 91cee5952e..eb108b0efb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/SortNodeSerdeTest.java
@@ -24,11 +24,15 @@ import org.apache.iotdb.db.mpp.plan.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
@@ -48,7 +52,11 @@ public class SortNodeSerdeTest {
100,
100,
null);
- SortNode sortNode = new SortNode(new PlanNodeId("TestSortNode"), seriesScanNode, Ordering.ASC);
+ SortNode sortNode =
+ new SortNode(
+ new PlanNodeId("TestSortNode"),
+ seriesScanNode,
+ new OrderByParameter(ImmutableList.of(new SortItem(SortKey.TIME, Ordering.ASC))));
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
sortNode.serialize(byteBuffer);