You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/07 14:42:58 UTC

[iotdb] 02/03: add FastAggregationQueryExecution

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/fastFE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 26e1aa2106df1ad85f043e32ac5d63cbd9f5db8d
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun May 7 22:42:03 2023 +0800

    add FastAggregationQueryExecution
---
 .../execution/FastAggregationQueryExecution.java   | 89 ++++++++++++++++++++++
 .../db/mpp/plan/execution/QueryExecution.java      | 16 ++--
 2 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java
new file mode 100644
index 00000000000..da2b06921bd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
+
+public class FastAggregationQueryExecution extends QueryExecution {
+  private static final Logger logger = LoggerFactory.getLogger(FastAggregationQueryExecution.class);
+
+  public FastAggregationQueryExecution(
+      Statement statement,
+      MPPQueryContext context,
+      ExecutorService executor,
+      ExecutorService writeOperationExecutor,
+      ScheduledExecutorService scheduledExecutor,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager) {
+    super(
+        statement,
+        context,
+        executor,
+        writeOperationExecutor,
+        scheduledExecutor,
+        partitionFetcher,
+        schemaFetcher,
+        syncInternalServiceClientManager,
+        asyncInternalServiceClientManager);
+  }
+
+  @Override
+  public void doLogicalPlan() {
+    // do nothing
+  }
+
+  @Override
+  public void doDistributedPlan() {
+    long startTime = System.nanoTime();
+
+    // TODO this.distributedPlan = planner.planFragments();
+
+    if (rawStatement.isQuery()) {
+      QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+    }
+    if (isQuery() && logger.isDebugEnabled()) {
+      logger.debug(
+          "distribution plan done. Fragment instance count is {}, details is: \n {}",
+          distributedPlan.getInstances().size(),
+          printFragmentInstances(distributedPlan.getInstances()));
+    }
+    // check timeout after building distribution plan because it could be time-consuming in some
+    // cases.
+    checkTimeOutForQuery();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 715cf29c716..5f772747369 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -109,10 +109,10 @@ public class QueryExecution implements IQueryExecution {
 
   private final List<PlanOptimizer> planOptimizers;
 
-  private final Statement rawStatement;
-  private Analysis analysis;
+  protected final Statement rawStatement;
+  protected Analysis analysis;
   private LogicalQueryPlan logicalPlan;
-  private DistributedQueryPlan distributedPlan;
+  protected DistributedQueryPlan distributedPlan;
 
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
@@ -139,9 +139,9 @@ public class QueryExecution implements IQueryExecution {
 
   private long totalExecutionTime;
 
-  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+  protected static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
-  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+  protected static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
   public QueryExecution(
@@ -226,7 +226,7 @@ public class QueryExecution implements IQueryExecution {
     schedule();
   }
 
-  private void checkTimeOutForQuery() {
+  protected void checkTimeOutForQuery() {
     // only check query operation's timeout because we will never limit write operation's execution
     // time
     if (isQuery()) {
@@ -285,7 +285,7 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
-  private Analysis analyze(
+  protected Analysis analyze(
       Statement statement,
       MPPQueryContext context,
       IPartitionFetcher partitionFetcher,
@@ -362,7 +362,7 @@ public class QueryExecution implements IQueryExecution {
     checkTimeOutForQuery();
   }
 
-  private String printFragmentInstances(List<FragmentInstance> instances) {
+  protected String printFragmentInstances(List<FragmentInstance> instances) {
     StringBuilder ret = new StringBuilder();
     for (FragmentInstance instance : instances) {
       ret.append(System.lineSeparator()).append(instance);