You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:57 UTC

[iotdb] 07/13: Fix schema pipeline bugs

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc55b20473a47058322c29ba4e92f970be289320
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 9 22:34:33 2023 +0800

    Fix schema pipeline bugs
---
 .../mpp/execution/fragment/FragmentInstanceManager.java | 14 +++++++++-----
 .../db/mpp/plan/planner/LocalExecutionPlanContext.java  |  4 ++++
 .../db/mpp/plan/planner/LocalExecutionPlanner.java      | 17 ++++-------------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java      |  2 +-
 .../db/mpp/plan/planner/PipelineDriverFactory.java      | 10 +++++++++-
 5 files changed, 27 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index efa8db0bf1..794174b939 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
@@ -41,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -195,14 +193,20 @@ public class FragmentInstanceManager {
                               fragmentInstanceId, stateMachine, instance.getSessionInfo()));
 
               try {
-                SchemaDriver driver =
+                List<PipelineDriverFactory> driverFactories =
                     planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion);
+
+                List<IDriver> drivers = new ArrayList<>();
+                driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+                // get the sinkHandle of last driver
+                ISinkHandle sinkHandle = drivers.get(drivers.size() - 1).getSinkHandle();
+
                 return createFragmentInstanceExecution(
                     scheduler,
                     instanceId,
                     context,
-                    Collections.singletonList(driver),
-                    driver.getSinkHandle(),
+                    drivers,
+                    sinkHandle,
                     stateMachine,
                     failedInstances,
                     instance.getTimeOut());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3132783e3e..2926e6a261 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -112,10 +112,14 @@ public class LocalExecutionPlanContext {
     this.allSensorsMap = new ConcurrentHashMap<>();
     this.typeProvider = null;
     this.nextOperatorId = new AtomicInteger(0);
+    this.nextPipelineId = new AtomicInteger(0);
 
     // there is no ttl in schema region, so we don't care this field
     this.dataRegionTTL = Long.MAX_VALUE;
     this.driverContext = new SchemaDriverContext(instanceContext, schemaRegion);
+    this.pipelineDriverFactories = new ArrayList<>();
+    // TODO combine with SchemaDriverContext
+    this.getNextPipelineId();
   }
 
   public void addPipelineDriverFactory(Operator operation, DriverContext driverContext) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 231ca1d680..91b3c5187f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -24,12 +24,9 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.exception.MemoryNotEnoughException;
 import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
-import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -81,7 +78,7 @@ public class LocalExecutionPlanner {
     return context.getPipelineDriverFactories();
   }
 
-  public SchemaDriver plan(
+  public List<PipelineDriverFactory> plan(
       PlanNode plan, FragmentInstanceContext instanceContext, ISchemaRegion schemaRegion)
       throws MemoryNotEnoughException {
     LocalExecutionPlanContext context =
@@ -92,18 +89,12 @@ public class LocalExecutionPlanner {
     // check whether current free memory is enough to execute current query
     checkMemory(root, instanceContext.getStateMachine());
 
+    context.addPipelineDriverFactory(root, context.getDriverContext());
+
     // set maxBytes one SourceHandle can reserve after visiting the whole tree
     context.setMaxBytesOneHandleCanReserve();
 
-    ITimeSliceAllocator timeSliceAllocator = context.getTimeSliceAllocator();
-    context
-        .getDriverContext()
-        .getOperatorContexts()
-        .forEach(
-            operatorContext ->
-                operatorContext.setMaxRunTime(timeSliceAllocator.getMaxRunTime(operatorContext)));
-
-    return new SchemaDriver(root, (SchemaDriverContext) context.getDriverContext());
+    return context.getPipelineDriverFactories();
   }
 
   private void checkMemory(Operator root, FragmentInstanceStateMachine stateMachine)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index fd54e09b2c..845cb2be29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2243,7 +2243,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1);
       int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size());
 
-      for (int i = 0; i < maxDop; i++) {
+      for (int i = 0; i < maxDop && i < localChildren.size(); i++) {
         // Only if dop >= size(children) + 1, split all children to new pipeline
         // Otherwise, the first group but not last will belong to the parent pipeline since the
         // children number of last group is greaterEqual than the first group
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index fc21ed5c59..e71107b7af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -20,8 +20,11 @@
 package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
+import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.Driver;
 import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
+import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 
 import static java.util.Objects.requireNonNull;
@@ -45,7 +48,12 @@ public class PipelineDriverFactory {
   public Driver createDriver() {
     requireNonNull(driverContext, "driverContext is null");
     try {
-      Driver driver = new DataDriver(operation, driverContext);
+      Driver driver = null;
+      if (driverContext instanceof DataDriverContext) {
+        driver = new DataDriver(operation, driverContext);
+      } else {
+        driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
+      }
       if (dependencyPipelineIndex != -1) {
         driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
       }