You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/21 10:49:07 UTC

[iotdb] 01/01: fix the issue that GroupByLevel cannot be used with Value Filter

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_query_level
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b429e4c776b892646d95bbd5ebe98a8ab34227f0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 21 18:48:54 2022 +0800

    fix the issue that GroupByLevel cannot be used with Value Filter
---
 .../iotdb/db/mpp/plan/execution/QueryExecution.java  | 10 +++++-----
 .../planner/distribution/DistributionPlanner.java    |  1 +
 .../plan/planner/distribution/SourceRewriter.java    | 20 ++++++++++++++++++++
 3 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..eb6974865f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -171,11 +171,11 @@ public class QueryExecution implements IQueryExecution {
       stateMachine.transitionToRunning();
       return;
     }
-    long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
-    if (remainTime <= 0) {
-      throw new QueryTimeoutRuntimeException();
-    }
-    context.setTimeOut(remainTime);
+//    long remainTime = context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime());
+//    if (remainTime <= 0) {
+//      throw new QueryTimeoutRuntimeException();
+//    }
+//    context.setTimeOut(remainTime);
 
     doLogicalPlan();
     doDistributedPlan();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2aad1c8a85..5365fe2bf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 51618cbe64..c0918156f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.ratis.thirdparty.io.opencensus.stats.Aggregation;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -531,6 +532,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
   @Override
   public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+    if (shouldUseNaiveAggregation(root)) {
+      return defaultRewrite(root, context);
+    }
     // Firstly, we build the tree structure for GroupByLevelNode
     List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
@@ -554,6 +558,22 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return newRoot;
   }
 
+  // If the Aggregation Query contains value filter, we need to use the naive query plan
+  // for it. That is, do the raw data query and then do the aggregation operation.
+  // Currently, the method to judge whether the query should use naive query plan is whether
+  // AggregationNode is contained in the PlanNode tree of logical plan.
+  private boolean shouldUseNaiveAggregation(PlanNode root) {
+    if (root instanceof AggregationNode) {
+      return true;
+    }
+    for (PlanNode child : root.getChildren()) {
+      if (shouldUseNaiveAggregation(child)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow(
       GroupByLevelNode root,
       SlidingWindowAggregationNode slidingWindowNode,