You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/06 08:00:09 UTC

[iotdb] branch registerLazy created (now 47033d5231)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch registerLazy
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 47033d5231 Only when the upstream operator is closed, then dependency driver can be submitted

This branch includes the following new commits:

     new 47033d5231 Only when the upstream operator is closed, then dependency driver can be submitted

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Only when the upstream operator is closed, then dependency driver can be submitted

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch registerLazy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 47033d5231a1c27ac9bfc0740875507ec5ef83f8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Apr 6 15:56:35 2023 +0800

    Only when the upstream operator is closed, then dependency driver can be submitted
---
 .../db/mpp/execution/driver/DriverContext.java     | 18 ++++++++++++++++
 .../iotdb/db/mpp/execution/driver/IDriver.java     |  2 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 25 ++++++++++++++++++----
 .../operator/source/ExchangeOperator.java          | 13 +++++++++++
 .../db/mpp/execution/schedule/DriverScheduler.java |  7 +++---
 .../db/mpp/execution/schedule/task/DriverTask.java | 18 ++++------------
 .../plan/planner/LocalExecutionPlanContext.java    |  4 ++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  3 ++-
 .../db/mpp/plan/planner/PipelineDriverFactory.java | 15 +++++++------
 .../execution/schedule/DriverSchedulerTest.java    |  9 ++++----
 10 files changed, 79 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..15c6dcc232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -39,10 +41,18 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISink sink;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
   private int dependencyDriverIndex = -1;
+  private ExchangeOperator upstreamOperator;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
+  @TestOnly
+  public DriverContext() {
+    this.fragmentInstanceContext = null;
+    this.timeSliceAllocator = null;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -78,6 +88,14 @@ public class DriverContext {
     return dependencyDriverIndex;
   }
 
+  public void setUpstreamOperator(ExchangeOperator upstreamOperator) {
+    this.upstreamOperator = upstreamOperator;
+  }
+
+  public ExchangeOperator getUpstreamOperator() {
+    return upstreamOperator;
+  }
+
   public void setSink(ISink sink) {
     this.sink = sink;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
   /** @return get Sink of current IDriver */
   ISink getSink();
 
-  int getDependencyDriverIndex();
+  DriverContext getDriverContext();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 34858dfaa4..6b1c2396a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -73,6 +73,7 @@ public class SharedTsBlockQueue {
   private ListenableFuture<Void> blockedOnMemory;
 
   private boolean closed = false;
+  private boolean alreadyRegistered = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkChannel sinkChannel;
@@ -80,6 +81,9 @@ public class SharedTsBlockQueue {
   private long maxBytesCanReserve =
       IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  // When the sink channel of a pipeline driver closes, all dependency drivers can be submitted
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId,
       String planNodeId,
@@ -91,10 +95,6 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
-    localMemoryManager
-        .getQueryPool()
-        .registerPlanNodeIdToQueryMemoryMap(
-            fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId);
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -207,6 +207,12 @@ public class SharedTsBlockQueue {
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+    if (!alreadyRegistered) {
+      localMemoryManager
+          .getQueryPool()
+          .registerPlanNodeIdToQueryMemoryMap(
+              localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
+    }
     Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
@@ -264,6 +270,17 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    // Dependency driver must be submitted before this task is cleared
+    if (blockedDependencyDriver != null) {
+      this.blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
 
   private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
     this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
   @Override
   public void close() throws Exception {
     sourceHandle.close();
+    if (blockedDependencyDriver != null) {
+      blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 6cd893f441..dc3fa2e959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
     List<DriverTask> submittedTasks = new ArrayList<>();
     for (DriverTask task : tasks) {
       IDriver driver = task.getDriver();
-      if (driver.getDependencyDriverIndex() != -1) {
+      int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+      if (dependencyDriverIndex != -1) {
         SettableFuture<?> blockedDependencyFuture =
-            tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+            tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
             () -> {
               // Only if query is alive, we can submit this task
@@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
       } finally {
         task.unlock();
       }
-      // Dependency driver must be submitted before this task is cleared
-      task.submitDependencyDriver();
       task.lock();
       try {
         clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..4768d222a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
-  private SettableFuture<Void> blockedDependencyDriver = null;
-
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
-  public void submitDependencyDriver() {
-    if (blockedDependencyDriver != null) {
-      this.blockedDependencyDriver.set(null);
-    }
-  }
-
   public SettableFuture<Void> getBlockedDependencyDriver() {
-    if (blockedDependencyDriver == null) {
-      blockedDependencyDriver = SettableFuture.create();
-    }
-    return blockedDependencyDriver;
+    return driver.getDriverContext().getUpstreamOperator().getBlockedDependencyDriver();
   }
 
   public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
     }
 
     @Override
-    public int getDependencyDriverIndex() {
-      return -1;
+    public DriverContext getDriverContext() {
+      return null;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+    return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+  }
+
   public int getPipelineNumber() {
     return pipelineDriverFactories.size();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1083fafcfc..a8c8e3ab84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2597,7 +2597,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           // Add dependency for all pipelines under current node
           if (dependencyChildNode != 0) {
             for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
-              context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+              context.getPipelineDriverFactories().get(i).setDependencyPipeIndex(dependencyPipeId);
             }
           }
 
@@ -2614,6 +2614,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                       context.getDriverContext()),
                   childNode.getPlanNodeId(),
                   childOperation.calculateMaxReturnSize());
+          context.getCurrentPipelineDriverFactory().setUpstreamOperator(sourceOperator);
           context
               .getTimeSliceAllocator()
               .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..07626548e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
   private final Operator operation;
-  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
       } else {
         driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
       }
-      if (dependencyPipelineIndex != -1) {
-        driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
-      }
       return driver;
     } catch (Throwable failure) {
       try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
     }
   }
 
-  public void setDependencyPipeline(int dependencyPipelineIndex) {
-    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  public void setDependencyPipeIndex(int dependencyDriverIndex) {
+    this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
   }
 
   public int getDependencyPipelineIndex() {
-    return dependencyPipelineIndex;
+    return this.driverContext.getDependencyDriverIndex();
+  }
+
+  public void setUpstreamOperator(ExchangeOperator exchangeOperator) {
+    this.driverContext.setUpstreamOperator(exchangeOperator);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index ff8d0fdebf..1f26df098c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
-    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
-    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
-    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
-    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());