You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/07 14:42:58 UTC
[iotdb] 02/03: add FastAggregationQueryExecution
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/fastFE
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 26e1aa2106df1ad85f043e32ac5d63cbd9f5db8d
Author: liuminghui233 <54...@qq.com>
AuthorDate: Sun May 7 22:42:03 2023 +0800
add FastAggregationQueryExecution
---
.../execution/FastAggregationQueryExecution.java | 89 ++++++++++++++++++++++
.../db/mpp/plan/execution/QueryExecution.java | 16 ++--
2 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java
new file mode 100644
index 00000000000..da2b06921bd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastAggregationQueryExecution.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
+
+public class FastAggregationQueryExecution extends QueryExecution {
+ private static final Logger logger = LoggerFactory.getLogger(FastAggregationQueryExecution.class);
+
+ public FastAggregationQueryExecution(
+ Statement statement,
+ MPPQueryContext context,
+ ExecutorService executor,
+ ExecutorService writeOperationExecutor,
+ ScheduledExecutorService scheduledExecutor,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager) {
+ super(
+ statement,
+ context,
+ executor,
+ writeOperationExecutor,
+ scheduledExecutor,
+ partitionFetcher,
+ schemaFetcher,
+ syncInternalServiceClientManager,
+ asyncInternalServiceClientManager);
+ }
+
+ @Override
+ public void doLogicalPlan() {
+ // do nothing
+ }
+
+ @Override
+ public void doDistributedPlan() {
+ long startTime = System.nanoTime();
+
+ // TODO this.distributedPlan = planner.planFragments();
+
+ if (rawStatement.isQuery()) {
+ QUERY_METRICS.recordPlanCost(DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
+ }
+ if (isQuery() && logger.isDebugEnabled()) {
+ logger.debug(
+ "distribution plan done. Fragment instance count is {}, details is: \n {}",
+ distributedPlan.getInstances().size(),
+ printFragmentInstances(distributedPlan.getInstances()));
+ }
+ // check timeout after building distribution plan because it could be time-consuming in some
+ // cases.
+ checkTimeOutForQuery();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 715cf29c716..5f772747369 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -109,10 +109,10 @@ public class QueryExecution implements IQueryExecution {
private final List<PlanOptimizer> planOptimizers;
- private final Statement rawStatement;
- private Analysis analysis;
+ protected final Statement rawStatement;
+ protected Analysis analysis;
private LogicalQueryPlan logicalPlan;
- private DistributedQueryPlan distributedPlan;
+ protected DistributedQueryPlan distributedPlan;
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
@@ -139,9 +139,9 @@ public class QueryExecution implements IQueryExecution {
private long totalExecutionTime;
- private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ protected static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
- private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+ protected static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();
public QueryExecution(
@@ -226,7 +226,7 @@ public class QueryExecution implements IQueryExecution {
schedule();
}
- private void checkTimeOutForQuery() {
+ protected void checkTimeOutForQuery() {
// only check query operation's timeout because we will never limit write operation's execution
// time
if (isQuery()) {
@@ -285,7 +285,7 @@ public class QueryExecution implements IQueryExecution {
}
// Analyze the statement in QueryContext. Generate the analysis this query need
- private Analysis analyze(
+ protected Analysis analyze(
Statement statement,
MPPQueryContext context,
IPartitionFetcher partitionFetcher,
@@ -362,7 +362,7 @@ public class QueryExecution implements IQueryExecution {
checkTimeOutForQuery();
}
- private String printFragmentInstances(List<FragmentInstance> instances) {
+ protected String printFragmentInstances(List<FragmentInstance> instances) {
StringBuilder ret = new StringBuilder();
for (FragmentInstance instance : instances) {
ret.append(System.lineSeparator()).append(instance);