You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/21 10:49:07 UTC
[iotdb] 01/01: fix the issue that GroupByLevel cannot be used with Value Filter
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_query_level
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b429e4c776b892646d95bbd5ebe98a8ab34227f0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 21 18:48:54 2022 +0800
fix the issue that GroupByLevel cannot be used with Value Filter
---
.../iotdb/db/mpp/plan/execution/QueryExecution.java | 10 +++++-----
.../planner/distribution/DistributionPlanner.java | 1 +
.../plan/planner/distribution/SourceRewriter.java | 20 ++++++++++++++++++++
3 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..eb6974865f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -171,11 +171,11 @@ public class QueryExecution implements IQueryExecution {
stateMachine.transitionToRunning();
return;
}
- long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
- if (remainTime <= 0) {
- throw new QueryTimeoutRuntimeException();
- }
- context.setTimeOut(remainTime);
+// long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
+// if (remainTime <= 0) {
+// throw new QueryTimeoutRuntimeException();
+// }
+// context.setTimeOut(remainTime);
doLogicalPlan();
doDistributedPlan();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2aad1c8a85..5365fe2bf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 51618cbe64..c0918156f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.ratis.thirdparty.io.opencensus.stats.Aggregation;
import java.util.ArrayList;
import java.util.Collections;
@@ -531,6 +532,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
@Override
public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+ if (shouldUseNaiveAggregation(root)) {
+ return defaultRewrite(root, context);
+ }
// Firstly, we build the tree structure for GroupByLevelNode
List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
@@ -554,6 +558,22 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return newRoot;
}
+ // If the Aggregation Query contains value filter, we need to use the naive query plan
+ // for it. That is, do the raw data query and then do the aggregation operation.
+ // Currently, the method to judge whether the query should use naive query plan is whether
+ // AggregationNode is contained in the PlanNode tree of logical plan.
+ private boolean shouldUseNaiveAggregation(PlanNode root) {
+ if (root instanceof AggregationNode) {
+ return true;
+ }
+ for (PlanNode child : root.getChildren()) {
+ if (shouldUseNaiveAggregation(child)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow(
GroupByLevelNode root,
SlidingWindowAggregationNode slidingWindowNode,