You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/07 04:13:27 UTC

[iotdb] 01/01: [IOTDB-5760] Query is blocked because of no memory

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LazyRegister1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ee1a2aa19c5e89df4168b931bd8825d84fb4f66e
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Apr 7 12:11:58 2023 +0800

    [IOTDB-5760] Query is blocked because of no memory
    
    (cherry picked from commit e6ec5fcd1462a3c18042f1a866144a04af47eaf4)
---
 .../iotdb/db/mpp/execution/driver/DriverContext.java   | 18 ++++++++++++++++++
 .../apache/iotdb/db/mpp/execution/driver/IDriver.java  |  2 +-
 .../db/mpp/execution/exchange/SharedTsBlockQueue.java  |  8 ++++++++
 .../execution/operator/source/ExchangeOperator.java    | 13 +++++++++++++
 .../db/mpp/execution/schedule/DriverScheduler.java     |  7 +++----
 .../db/mpp/execution/schedule/task/DriverTask.java     | 18 ++++--------------
 .../db/mpp/plan/planner/LocalExecutionPlanContext.java |  4 ++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java     |  1 +
 .../db/mpp/plan/planner/PipelineDriverFactory.java     | 15 ++++++++-------
 .../db/mpp/execution/schedule/DriverSchedulerTest.java |  9 +++++----
 10 files changed, 65 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 9b0ae75f28..2c2f5878c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -37,10 +39,18 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISink sink;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
   private int dependencyDriverIndex = -1;
+  private ExchangeOperator downstreamOperator;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
+  @TestOnly
+  public DriverContext() {
+    this.fragmentInstanceContext = null;
+    this.timeSliceAllocator = null;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -68,6 +78,14 @@ public class DriverContext {
     return dependencyDriverIndex;
   }
 
+  public void setDownstreamOperator(ExchangeOperator downstreamOperator) {
+    this.downstreamOperator = downstreamOperator;
+  }
+
+  public ExchangeOperator getDownstreamOperator() {
+    return downstreamOperator;
+  }
+
   public void setSink(ISink sink) {
     this.sink = sink;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
   /** @return get Sink of current IDriver */
   ISink getSink();
 
-  int getDependencyDriverIndex();
+  DriverContext getDriverContext();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..c0acc2177b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -73,6 +73,7 @@ public class SharedTsBlockQueue {
   private ListenableFuture<Void> blockedOnMemory;
 
   private boolean closed = false;
+  private boolean alreadyRegistered = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkChannel sinkChannel;
@@ -203,6 +204,13 @@ public class SharedTsBlockQueue {
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+    if (!alreadyRegistered) {
+      localMemoryManager
+          .getQueryPool()
+          .registerPlanNodeIdToQueryMemoryMap(
+              localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
+      alreadyRegistered = true;
+    }
     Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
 
   private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
     this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
   @Override
   public void close() throws Exception {
     sourceHandle.close();
+    if (blockedDependencyDriver != null) {
+      blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 6cd893f441..dc3fa2e959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
     List<DriverTask> submittedTasks = new ArrayList<>();
     for (DriverTask task : tasks) {
       IDriver driver = task.getDriver();
-      if (driver.getDependencyDriverIndex() != -1) {
+      int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+      if (dependencyDriverIndex != -1) {
         SettableFuture<?> blockedDependencyFuture =
-            tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+            tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
             () -> {
               // Only if query is alive, we can submit this task
@@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
       } finally {
         task.unlock();
       }
-      // Dependency driver must be submitted before this task is cleared
-      task.submitDependencyDriver();
       task.lock();
       try {
         clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..ccde6ae18f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
-  private SettableFuture<Void> blockedDependencyDriver = null;
-
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
-  public void submitDependencyDriver() {
-    if (blockedDependencyDriver != null) {
-      this.blockedDependencyDriver.set(null);
-    }
-  }
-
   public SettableFuture<Void> getBlockedDependencyDriver() {
-    if (blockedDependencyDriver == null) {
-      blockedDependencyDriver = SettableFuture.create();
-    }
-    return blockedDependencyDriver;
+    return driver.getDriverContext().getDownstreamOperator().getBlockedDependencyDriver();
   }
 
   public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
     }
 
     @Override
-    public int getDependencyDriverIndex() {
-      return -1;
+    public DriverContext getDriverContext() {
+      return null;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+    return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+  }
+
   public int getPipelineNumber() {
     return pipelineDriverFactories.size();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1ac76c4d52..110a7c2cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2596,6 +2596,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                       context.getDriverContext()),
                   childNode.getPlanNodeId(),
                   childOperation.calculateMaxReturnSize());
+          context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
           context
               .getTimeSliceAllocator()
               .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..6896a34766 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
   private final Operator operation;
-  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
       } else {
         driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
       }
-      if (dependencyPipelineIndex != -1) {
-        driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
-      }
       return driver;
     } catch (Throwable failure) {
       try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
     }
   }
 
-  public void setDependencyPipeline(int dependencyPipelineIndex) {
-    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  public void setDependencyPipeline(int dependencyDriverIndex) {
+    this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
   }
 
   public int getDependencyPipelineIndex() {
-    return dependencyPipelineIndex;
+    return this.driverContext.getDependencyDriverIndex();
+  }
+
+  public void setDownstreamOperator(ExchangeOperator exchangeOperator) {
+    this.driverContext.setDownstreamOperator(exchangeOperator);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index ff8d0fdebf..1f26df098c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
-    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
-    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
-    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
-    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());