You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:57 UTC
[iotdb] 07/13: Fix schema pipeline bugs
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fc55b20473a47058322c29ba4e92f970be289320
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 22:34:33 2023 +0800
Fix schema pipeline bugs
---
.../mpp/execution/fragment/FragmentInstanceManager.java | 14 +++++++++-----
.../db/mpp/plan/planner/LocalExecutionPlanContext.java | 4 ++++
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 17 ++++-------------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 2 +-
.../db/mpp/plan/planner/PipelineDriverFactory.java | 10 +++++++++-
5 files changed, 27 insertions(+), 20 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index efa8db0bf1..794174b939 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
@@ -41,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -195,14 +193,20 @@ public class FragmentInstanceManager {
fragmentInstanceId, stateMachine, instance.getSessionInfo()));
try {
- SchemaDriver driver =
+ List<PipelineDriverFactory> driverFactories =
planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion);
+
+ List<IDriver> drivers = new ArrayList<>();
+ driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+ // get the sinkHandle of last driver
+ ISinkHandle sinkHandle = drivers.get(drivers.size() - 1).getSinkHandle();
+
return createFragmentInstanceExecution(
scheduler,
instanceId,
context,
- Collections.singletonList(driver),
- driver.getSinkHandle(),
+ drivers,
+ sinkHandle,
stateMachine,
failedInstances,
instance.getTimeOut());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3132783e3e..2926e6a261 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -112,10 +112,14 @@ public class LocalExecutionPlanContext {
this.allSensorsMap = new ConcurrentHashMap<>();
this.typeProvider = null;
this.nextOperatorId = new AtomicInteger(0);
+ this.nextPipelineId = new AtomicInteger(0);
// there is no ttl in schema region, so we don't care this field
this.dataRegionTTL = Long.MAX_VALUE;
this.driverContext = new SchemaDriverContext(instanceContext, schemaRegion);
+ this.pipelineDriverFactories = new ArrayList<>();
+ // TODO combine with SchemaDriverContext
+ this.getNextPipelineId();
}
public void addPipelineDriverFactory(Operator operation, DriverContext driverContext) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 231ca1d680..91b3c5187f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -24,12 +24,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -81,7 +78,7 @@ public class LocalExecutionPlanner {
return context.getPipelineDriverFactories();
}
- public SchemaDriver plan(
+ public List<PipelineDriverFactory> plan(
PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
throws MemoryNotEnoughException {
LocalExecutionPlanContext context =
@@ -92,18 +89,12 @@ public class LocalExecutionPlanner {
// check whether current free memory is enough to execute current query
checkMemory(root, instanceContext.getStateMachine());
+ context.addPipelineDriverFactory(root, context.getDriverContext());
+
// set maxBytes one SourceHandle can reserve after visiting the whole tree
context.setMaxBytesOneHandleCanReserve();
- ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
- context
- .getDriverContext()
- .getOperatorContexts()
- .forEach(
- operatorContext ->
- operatorContext.setMaxRunTime(timeSliceAllocator.getMaxRunTime(operatorContext)));
-
- return new SchemaDriver(root, (SchemaDriverContext) context.getDriverContext());
+ return context.getPipelineDriverFactories();
}
private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index fd54e09b2c..845cb2be29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2243,7 +2243,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1);
int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size());
- for (int i = 0; i < maxDop; i++) {
+ for (int i = 0; i < maxDop && i < localChildren.size(); i++) {
// Only if dop >= size(children) + 1, split all children to new pipeline
// Otherwise, the first group but not last will belong to the parent pipeline since the
// children number of last group is greaterEqual than the first group
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index fc21ed5c59..e71107b7af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.Driver;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import static java.util.Objects.requireNonNull;
@@ -45,7 +48,12 @@ public class PipelineDriverFactory {
public Driver createDriver() {
requireNonNull(driverContext, "driverContext is null");
try {
- Driver driver = new DataDriver(operation, driverContext);
+ Driver driver = null;
+ if (driverContext instanceof DataDriverContext) {
+ driver = new DataDriver(operation, driverContext);
+ } else {
+ driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
+ }
if (dependencyPipelineIndex != -1) {
driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
}