You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/07 04:13:26 UTC

[iotdb] branch LazyRegister1.1 created (now ee1a2aa19c)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LazyRegister1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ee1a2aa19c [IOTDB-5760] Query is blocked because of no memory

This branch includes the following new commits:

     new ee1a2aa19c [IOTDB-5760] Query is blocked because of no memory

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5760] Query is blocked because of no memory

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LazyRegister1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ee1a2aa19c5e89df4168b931bd8825d84fb4f66e
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Apr 7 12:11:58 2023 +0800

    [IOTDB-5760] Query is blocked because of no memory
    
    (cherry picked from commit e6ec5fcd1462a3c18042f1a866144a04af47eaf4)
---
 .../iotdb/db/mpp/execution/driver/DriverContext.java   | 18 ++++++++++++++++++
 .../apache/iotdb/db/mpp/execution/driver/IDriver.java  |  2 +-
 .../db/mpp/execution/exchange/SharedTsBlockQueue.java  |  8 ++++++++
 .../execution/operator/source/ExchangeOperator.java    | 13 +++++++++++++
 .../db/mpp/execution/schedule/DriverScheduler.java     |  7 +++----
 .../db/mpp/execution/schedule/task/DriverTask.java     | 18 ++++--------------
 .../db/mpp/plan/planner/LocalExecutionPlanContext.java |  4 ++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java     |  1 +
 .../db/mpp/plan/planner/PipelineDriverFactory.java     | 15 ++++++++-------
 .../db/mpp/execution/schedule/DriverSchedulerTest.java |  9 +++++----
 10 files changed, 65 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 9b0ae75f28..2c2f5878c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.execution.driver;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -37,10 +39,18 @@ public class DriverContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISink sink;
   private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
   private int dependencyDriverIndex = -1;
+  private ExchangeOperator downstreamOperator;
 
   private final AtomicBoolean finished = new AtomicBoolean();
 
+  @TestOnly
+  public DriverContext() {
+    this.fragmentInstanceContext = null;
+    this.timeSliceAllocator = null;
+  }
+
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -68,6 +78,14 @@ public class DriverContext {
     return dependencyDriverIndex;
   }
 
+  public void setDownstreamOperator(ExchangeOperator downstreamOperator) {
+    this.downstreamOperator = downstreamOperator;
+  }
+
+  public ExchangeOperator getDownstreamOperator() {
+    return downstreamOperator;
+  }
+
   public void setSink(ISink sink) {
     this.sink = sink;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
   /** @return get Sink of current IDriver */
   ISink getSink();
 
-  int getDependencyDriverIndex();
+  DriverContext getDriverContext();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..c0acc2177b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -73,6 +73,7 @@ public class SharedTsBlockQueue {
   private ListenableFuture<Void> blockedOnMemory;
 
   private boolean closed = false;
+  private boolean alreadyRegistered = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkChannel sinkChannel;
@@ -203,6 +204,13 @@ public class SharedTsBlockQueue {
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+    if (!alreadyRegistered) {
+      localMemoryManager
+          .getQueryPool()
+          .registerPlanNodeIdToQueryMemoryMap(
+              localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
+      alreadyRegistered = true;
+    }
     Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
 
   private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
+  private SettableFuture<Void> blockedDependencyDriver = null;
+
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
     this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
   @Override
   public void close() throws Exception {
     sourceHandle.close();
+    if (blockedDependencyDriver != null) {
+      blockedDependencyDriver.set(null);
+    }
+  }
+
+  public SettableFuture<Void> getBlockedDependencyDriver() {
+    if (blockedDependencyDriver == null) {
+      blockedDependencyDriver = SettableFuture.create();
+    }
+    return blockedDependencyDriver;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 6cd893f441..dc3fa2e959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
     List<DriverTask> submittedTasks = new ArrayList<>();
     for (DriverTask task : tasks) {
       IDriver driver = task.getDriver();
-      if (driver.getDependencyDriverIndex() != -1) {
+      int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+      if (dependencyDriverIndex != -1) {
         SettableFuture<?> blockedDependencyFuture =
-            tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+            tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
         blockedDependencyFuture.addListener(
             () -> {
               // Only if query is alive, we can submit this task
@@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
       } finally {
         task.unlock();
       }
-      // Dependency driver must be submitted before this task is cleared
-      task.submitDependencyDriver();
       task.lock();
       try {
         clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..ccde6ae18f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
 
-  private SettableFuture<Void> blockedDependencyDriver = null;
-
   /** Initialize a dummy instance for queryHolder */
   public DriverTask() {
     this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
     this.abortCause = abortCause;
   }
 
-  public void submitDependencyDriver() {
-    if (blockedDependencyDriver != null) {
-      this.blockedDependencyDriver.set(null);
-    }
-  }
-
   public SettableFuture<Void> getBlockedDependencyDriver() {
-    if (blockedDependencyDriver == null) {
-      blockedDependencyDriver = SettableFuture.create();
-    }
-    return blockedDependencyDriver;
+    return driver.getDriverContext().getDownstreamOperator().getBlockedDependencyDriver();
   }
 
   public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
     }
 
     @Override
-    public int getDependencyDriverIndex() {
-      return -1;
+    public DriverContext getDriverContext() {
+      return null;
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
     return pipelineDriverFactories;
   }
 
+  public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+    return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+  }
+
   public int getPipelineNumber() {
     return pipelineDriverFactories.size();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1ac76c4d52..110a7c2cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2596,6 +2596,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                       context.getDriverContext()),
                   childNode.getPlanNodeId(),
                   childOperation.calculateMaxReturnSize());
+          context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
           context
               .getTimeSliceAllocator()
               .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..6896a34766 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
   private final DriverContext driverContext;
   // TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
   private final Operator operation;
-  private int dependencyPipelineIndex = -1;
 
   public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
     this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
       } else {
         driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
       }
-      if (dependencyPipelineIndex != -1) {
-        driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
-      }
       return driver;
     } catch (Throwable failure) {
       try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
     }
   }
 
-  public void setDependencyPipeline(int dependencyPipelineIndex) {
-    this.dependencyPipelineIndex = dependencyPipelineIndex;
+  public void setDependencyPipeline(int dependencyDriverIndex) {
+    this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
   }
 
   public int getDependencyPipelineIndex() {
-    return dependencyPipelineIndex;
+    return this.driverContext.getDependencyDriverIndex();
+  }
+
+  public void setDownstreamOperator(ExchangeOperator exchangeOperator) {
+    this.driverContext.setDownstreamOperator(exchangeOperator);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index ff8d0fdebf..1f26df098c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
 import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
     IDriver mockDriver1 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
-    Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
     FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
     DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
     IDriver mockDriver2 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
-    Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
     List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
     manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
     FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
     DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
     Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
-    Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
     DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
     IDriver mockDriver4 = Mockito.mock(IDriver.class);
     Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
-    Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+    Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
     manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
     Assert.assertTrue(manager.getBlockedTasks().isEmpty());
     Assert.assertEquals(2, manager.getQueryMap().size());