You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/06 08:00:10 UTC
[iotdb] 01/01: Only when the upstream operator is closed, then dependency driver can be submitted
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch registerLazy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 47033d5231a1c27ac9bfc0740875507ec5ef83f8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Apr 6 15:56:35 2023 +0800
Only when the upstream operator is closed, then dependency driver can be submitted
---
.../db/mpp/execution/driver/DriverContext.java | 18 ++++++++++++++++
.../iotdb/db/mpp/execution/driver/IDriver.java | 2 +-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 25 ++++++++++++++++++----
.../operator/source/ExchangeOperator.java | 13 +++++++++++
.../db/mpp/execution/schedule/DriverScheduler.java | 7 +++---
.../db/mpp/execution/schedule/task/DriverTask.java | 18 ++++------------
.../plan/planner/LocalExecutionPlanContext.java | 4 ++++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 3 ++-
.../db/mpp/plan/planner/PipelineDriverFactory.java | 15 +++++++------
.../execution/schedule/DriverSchedulerTest.java | 9 ++++----
10 files changed, 79 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..15c6dcc232 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
*/
package org.apache.iotdb.db.mpp.execution.driver;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -39,10 +41,18 @@ public class DriverContext {
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISink sink;
private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
private int dependencyDriverIndex = -1;
+ private ExchangeOperator upstreamOperator;
private final AtomicBoolean finished = new AtomicBoolean();
+ @TestOnly
+ public DriverContext() {
+ this.fragmentInstanceContext = null;
+ this.timeSliceAllocator = null;
+ }
+
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
this.fragmentInstanceContext = fragmentInstanceContext;
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -78,6 +88,14 @@ public class DriverContext {
return dependencyDriverIndex;
}
+ public void setUpstreamOperator(ExchangeOperator upstreamOperator) {
+ this.upstreamOperator = upstreamOperator;
+ }
+
+ public ExchangeOperator getUpstreamOperator() {
+ return upstreamOperator;
+ }
+
public void setSink(ISink sink) {
this.sink = sink;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
/** @return get Sink of current IDriver */
ISink getSink();
- int getDependencyDriverIndex();
+ DriverContext getDriverContext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 34858dfaa4..6b1c2396a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -73,6 +73,7 @@ public class SharedTsBlockQueue {
private ListenableFuture<Void> blockedOnMemory;
private boolean closed = false;
+ private boolean alreadyRegistered = false;
private LocalSourceHandle sourceHandle;
private LocalSinkChannel sinkChannel;
@@ -80,6 +81,9 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ // When the sink channel of a pipeline driver closes, all dependency drivers can be submitted
+ private SettableFuture<Void> blockedDependencyDriver = null;
+
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
@@ -91,10 +95,6 @@ public class SharedTsBlockQueue {
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
- localMemoryManager
- .getQueryPool()
- .registerPlanNodeIdToQueryMemoryMap(
- fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId);
}
public boolean hasNoMoreTsBlocks() {
@@ -207,6 +207,12 @@ public class SharedTsBlockQueue {
Validate.notNull(tsBlock, "TsBlock cannot be null");
Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
+ if (!alreadyRegistered) {
+ localMemoryManager
+ .getQueryPool()
+ .registerPlanNodeIdToQueryMemoryMap(
+ localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
+ }
Pair<ListenableFuture<Void>, Boolean> pair =
localMemoryManager
.getQueryPool()
@@ -264,6 +270,17 @@ public class SharedTsBlockQueue {
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ // Dependency driver must be submitted before this task is cleared
+ if (blockedDependencyDriver != null) {
+ this.blockedDependencyDriver.set(null);
+ }
+ }
+
+ public SettableFuture<Void> getBlockedDependencyDriver() {
+ if (blockedDependencyDriver == null) {
+ blockedDependencyDriver = SettableFuture.create();
+ }
+ return blockedDependencyDriver;
}
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ private SettableFuture<Void> blockedDependencyDriver = null;
+
public ExchangeOperator(
OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
@Override
public void close() throws Exception {
sourceHandle.close();
+ if (blockedDependencyDriver != null) {
+ blockedDependencyDriver.set(null);
+ }
+ }
+
+ public SettableFuture<Void> getBlockedDependencyDriver() {
+ if (blockedDependencyDriver == null) {
+ blockedDependencyDriver = SettableFuture.create();
+ }
+ return blockedDependencyDriver;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 6cd893f441..dc3fa2e959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
List<DriverTask> submittedTasks = new ArrayList<>();
for (DriverTask task : tasks) {
IDriver driver = task.getDriver();
- if (driver.getDependencyDriverIndex() != -1) {
+ int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+ if (dependencyDriverIndex != -1) {
SettableFuture<?> blockedDependencyFuture =
- tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+ tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
blockedDependencyFuture.addListener(
() -> {
// Only if query is alive, we can submit this task
@@ -485,8 +486,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
} finally {
task.unlock();
}
- // Dependency driver must be submitted before this task is cleared
- task.submitDependencyDriver();
task.lock();
try {
clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..4768d222a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
- private SettableFuture<Void> blockedDependencyDriver = null;
-
/** Initialize a dummy instance for queryHolder */
public DriverTask() {
this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
this.abortCause = abortCause;
}
- public void submitDependencyDriver() {
- if (blockedDependencyDriver != null) {
- this.blockedDependencyDriver.set(null);
- }
- }
-
public SettableFuture<Void> getBlockedDependencyDriver() {
- if (blockedDependencyDriver == null) {
- blockedDependencyDriver = SettableFuture.create();
- }
- return blockedDependencyDriver;
+ return driver.getDriverContext().getUpstreamOperator().getBlockedDependencyDriver();
}
public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
}
@Override
- public int getDependencyDriverIndex() {
- return -1;
+ public DriverContext getDriverContext() {
+ return null;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
return pipelineDriverFactories;
}
+ public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+ return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+ }
+
public int getPipelineNumber() {
return pipelineDriverFactories.size();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1083fafcfc..a8c8e3ab84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2597,7 +2597,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// Add dependency for all pipelines under current node
if (dependencyChildNode != 0) {
for (int i = originPipeNum; i < subContext.getPipelineNumber(); i++) {
- context.getPipelineDriverFactories().get(i).setDependencyPipeline(dependencyPipeId);
+ context.getPipelineDriverFactories().get(i).setDependencyPipeIndex(dependencyPipeId);
}
}
@@ -2614,6 +2614,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getDriverContext()),
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
+ context.getCurrentPipelineDriverFactory().setUpstreamOperator(sourceOperator);
context
.getTimeSliceAllocator()
.recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..07626548e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import static java.util.Objects.requireNonNull;
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
private final DriverContext driverContext;
// TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
private final Operator operation;
- private int dependencyPipelineIndex = -1;
public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
} else {
driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
}
- if (dependencyPipelineIndex != -1) {
- driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
- }
return driver;
} catch (Throwable failure) {
try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
}
}
- public void setDependencyPipeline(int dependencyPipelineIndex) {
- this.dependencyPipelineIndex = dependencyPipelineIndex;
+ public void setDependencyPipeIndex(int dependencyDriverIndex) {
+ this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
}
public int getDependencyPipelineIndex() {
- return dependencyPipelineIndex;
+ return this.driverContext.getDependencyDriverIndex();
+ }
+
+ public void setUpstreamOperator(ExchangeOperator exchangeOperator) {
+ this.driverContext.setUpstreamOperator(exchangeOperator);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index ff8d0fdebf..1f26df098c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
- Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
- Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
- Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
- Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());