You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/06 08:00:10 UTC

[iotdb] 01/01: Only when the upstream operator is closed, then dependency driver can be submitted

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch registerLazy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 47033d5231a1c27ac9bfc0740875507ec5ef83f8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Apr 6 15:56:35 2023 +0800

    Only when the upstream operator is closed, then dependency driver can be submitted
---
 .../db/mpp/execution/driver/DriverContext.java     | 18 ++++++++++++++++
 .../iotdb/db/mpp/execution/driver/IDriver.java     |  2 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 25 ++++++++++++++++++----
 .../operator/source/ExchangeOperator.java          | 13 +++++++++++
 .../db/mpp/execution/schedule/DriverScheduler.java |  7 +++---
 .../db/mpp/execution/schedule/task/DriverTask.java | 18 ++++------------
 .../plan/planner/LocalExecutionPlanContext.java    |  4 ++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  3 ++-
 .../db/mpp/plan/planner/PipelineDriverFactory.java | 15 +++++++------
 .../execution/schedule/DriverSchedulerTest.java    |  9 ++++----
 10 files changed, 79 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..15c6dcc232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -39,10 +41,18 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISink sink;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
   private int dependencyDriverIndex = -1;
+  private ExchangeOperator upstreamOperator;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
+  @TestOnly
+  public DriverContext() {
+    this.fragmentInstanceContext = null;
+    this.timeSliceAllocator = null;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -78,6 +88,14 @@ public class DriverContext {
     return dependencyDriverIndex;
   }
 
+  public void setUpstreamOperator(ExchangeOperator upstreamOperator) {
+    this.upstreamOperator = upstreamOperator;
+  }
+
+  public ExchangeOperator getUpstreamOperator() {
+    return upstreamOperator;
+  }
+
   public void setSink(ISink sink) {
     this.sink = sink;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
   /** @return get Sink of current IDriver */
   ISink getSink();
 
-  int getDependencyDriverIndex();
+  DriverContext getDriverContext();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 34858dfaa4..6b1c2396a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -73,6 +73,7 @@ public class SharedTsBlockQueue {
   private ListenableFuture<Void> blockedOnMemory;
 
   private boolean closed = false;
+  private boolean alreadyRegistered = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkChannel sinkChannel;
@@ -80,6 +81,9 @@ public class SharedTsBlockQueue {
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  // When the sink channel of a pipeline driver closes, all dependency drivers can be submitted
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId,
       String planNodeId,
@@ -91,10 +95,6 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
-    localMemoryManager
-        .getQueryPool()
-        .registerPlanNodeIdToQueryMemoryMap(
-            fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId);
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -207,6 +207,12 @@ public class SharedTsBlockQueue {
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+    if (!alreadyRegistered) {
+      localMemoryManager
+          .getQueryPool()
+          .registerPlanNodeIdToQueryMemoryMap(
+              localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
+    }
     Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
@@ -264,6 +270,17 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    // Dependency driver must be submitted before this task is cleared
+    if (blockedDependencyDriver != null) {
+      this.blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
 
   private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
     this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
   @Override
   public void close() throws Exception {
     sourceHandle.close();
+    if (blockedDependencyDriver != null) {
+      blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 6cd893f441..dc3fa2e959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
     List<DriverTask> submittedTasks = new ArrayList<>();
     for (DriverTask task : tasks) {
       IDriver driver = task.getDriver();
-      if (driver.getDependencyDriverIndex() != -1) {
+      int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+      if (dependencyDriverIndex != -1) {
         SettableFuture<?> blockedDependencyFuture =
-            tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+            tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
             () -> {
               // Only if query is alive, we can submit this task
@@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
       } finally {
         task.unlock();
       }
-      // Dependency driver must be submitted before this task is cleared
-      task.submitDependencyDriver();
       task.lock();
       try {
         clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..4768d222a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
-  private SettableFuture<Void> blockedDependencyDriver = null;
-
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
-  public void submitDependencyDriver() {
-    if (blockedDependencyDriver != null) {
-      this.blockedDependencyDriver.set(null);
-    }
-  }
-
   public SettableFuture<Void> getBlockedDependencyDriver() {
-    if (blockedDependencyDriver == null) {
-      blockedDependencyDriver = SettableFuture.create();
-    }
-    return blockedDependencyDriver;
+    return driver.getDriverContext().getUpstreamOperator().getBlockedDependencyDriver();
   }
 
   public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
     }
 
     @Override
-    public int getDependencyDriverIndex() {
-      return -1;
+    public DriverContext getDriverContext() {
+      return null;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+    return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+  }
+
   public int getPipelineNumber() {
     return pipelineDriverFactories.size();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1083fafcfc..a8c8e3ab84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2597,7 +2597,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           // Add dependency for all pipelines under current node
           if (dependencyChildNode != 0) {
             for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
-              context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+              context.getPipelineDriverFactories().get(i).setDependencyPipeIndex(dependencyPipeId);
             }
           }
 
@@ -2614,6 +2614,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                       context.getDriverContext()),
                   childNode.getPlanNodeId(),
                   childOperation.calculateMaxReturnSize());
+          context.getCurrentPipelineDriverFactory().setUpstreamOperator(sourceOperator);
           context
               .getTimeSliceAllocator()
               .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..07626548e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
   private final Operator operation;
-  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
       } else {
         driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
       }
-      if (dependencyPipelineIndex != -1) {
-        driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
-      }
       return driver;
     } catch (Throwable failure) {
       try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
     }
   }
 
-  public void setDependencyPipeline(int dependencyPipelineIndex) {
-    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  public void setDependencyPipeIndex(int dependencyDriverIndex) {
+    this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
   }
 
   public int getDependencyPipelineIndex() {
-    return dependencyPipelineIndex;
+    return this.driverContext.getDependencyDriverIndex();
+  }
+
+  public void setUpstreamOperator(ExchangeOperator exchangeOperator) {
+    this.driverContext.setUpstreamOperator(exchangeOperator);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index ff8d0fdebf..1f26df098c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
-    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
-    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
-    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
-    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());