You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/09 11:41:29 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] [IOTDB-5760] Query is blocked because of no memory
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new b89cd0da07 [To rel/1.1] [IOTDB-5760] Query is blocked because of no memory
b89cd0da07 is described below
commit b89cd0da07b7c2238424ef04a74c6ebf7307f8d1
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Apr 9 19:41:21 2023 +0800
[To rel/1.1] [IOTDB-5760] Query is blocked because of no memory
---
.../db/mpp/execution/driver/DriverContext.java | 18 +++++++++++
.../iotdb/db/mpp/execution/driver/IDriver.java | 2 +-
.../operator/source/ExchangeOperator.java | 13 ++++++++
.../db/mpp/execution/schedule/DriverScheduler.java | 37 ++++++++++++----------
.../db/mpp/execution/schedule/task/DriverTask.java | 18 +++--------
.../plan/planner/LocalExecutionPlanContext.java | 4 +++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 1 +
.../db/mpp/plan/planner/PipelineDriverFactory.java | 15 +++++----
.../execution/schedule/DriverSchedulerTest.java | 9 +++---
9 files changed, 75 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 9b0ae75f28..2c2f5878c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -18,9 +18,11 @@
*/
package org.apache.iotdb.db.mpp.execution.driver;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -37,10 +39,18 @@ public class DriverContext {
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISink sink;
private final RuleBasedTimeSliceAllocator timeSliceAllocator;
+
private int dependencyDriverIndex = -1;
+ private ExchangeOperator downstreamOperator;
private final AtomicBoolean finished = new AtomicBoolean();
+ @TestOnly
+ public DriverContext() {
+ this.fragmentInstanceContext = null;
+ this.timeSliceAllocator = null;
+ }
+
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
this.fragmentInstanceContext = fragmentInstanceContext;
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
@@ -68,6 +78,14 @@ public class DriverContext {
return dependencyDriverIndex;
}
+ public void setDownstreamOperator(ExchangeOperator downstreamOperator) {
+ this.downstreamOperator = downstreamOperator;
+ }
+
+ public ExchangeOperator getDownstreamOperator() {
+ return downstreamOperator;
+ }
+
public void setSink(ISink sink) {
this.sink = sink;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 88514bddcc..a06ba08c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -72,5 +72,5 @@ public interface IDriver {
/** @return get Sink of current IDriver */
ISink getSink();
- int getDependencyDriverIndex();
+ DriverContext getDriverContext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 27ef9a3022..de23d93fb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
@@ -39,6 +40,8 @@ public class ExchangeOperator implements SourceOperator {
private long maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ private SettableFuture<Void> blockedDependencyDriver = null;
+
public ExchangeOperator(
OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) {
this.operatorContext = operatorContext;
@@ -122,5 +125,15 @@ public class ExchangeOperator implements SourceOperator {
@Override
public void close() throws Exception {
sourceHandle.close();
+ if (blockedDependencyDriver != null) {
+ blockedDependencyDriver.set(null);
+ }
+ }
+
+ public SettableFuture<Void> getBlockedDependencyDriver() {
+ if (blockedDependencyDriver == null) {
+ blockedDependencyDriver = SettableFuture.create();
+ }
+ return blockedDependencyDriver;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 90bf9b2e98..50862859e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -188,9 +188,10 @@ public class DriverScheduler implements IDriverScheduler, IService {
List<DriverTask> submittedTasks = new ArrayList<>();
for (DriverTask task : tasks) {
IDriver driver = task.getDriver();
- if (driver.getDependencyDriverIndex() != -1) {
+ int dependencyDriverIndex = driver.getDriverContext().getDependencyDriverIndex();
+ if (dependencyDriverIndex != -1) {
SettableFuture<?> blockedDependencyFuture =
- tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver();
+ tasks.get(dependencyDriverIndex).getBlockedDependencyDriver();
blockedDependencyFuture.addListener(
() -> {
// Only if query is alive, we can submit this task
@@ -268,18 +269,24 @@ public class DriverScheduler implements IDriverScheduler, IService {
@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
- Set<DriverTask> instanceRelatedTasks = queryMap.get(instanceId.getQueryId()).remove(instanceId);
- if (instanceRelatedTasks != null) {
- for (DriverTask task : instanceRelatedTasks) {
- if (task == null) {
- return;
- }
- task.lock();
- try {
- task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
- clearDriverTask(task);
- } finally {
- task.unlock();
+ Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
+ queryMap.get(instanceId.getQueryId());
+ if (queryRelatedTasks != null) {
+ Set<DriverTask> instanceRelatedTasks = queryRelatedTasks.remove(instanceId);
+ if (instanceRelatedTasks != null) {
+ synchronized (instanceRelatedTasks) {
+ for (DriverTask task : instanceRelatedTasks) {
+ if (task == null) {
+ return;
+ }
+ task.lock();
+ try {
+ task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
+ clearDriverTask(task);
+ } finally {
+ task.unlock();
+ }
+ }
}
}
}
@@ -481,8 +488,6 @@ public class DriverScheduler implements IDriverScheduler, IService {
} finally {
task.unlock();
}
- // Dependency driver must be submitted before this task is cleared
- task.submitDependencyDriver();
task.lock();
try {
clearDriverTask(task);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 20334d4c44..ccde6ae18f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.schedule.task;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
@@ -59,8 +60,6 @@ public class DriverTask implements IDIndexedAccessible {
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
- private SettableFuture<Void> blockedDependencyDriver = null;
-
/** Initialize a dummy instance for queryHolder */
public DriverTask() {
this(new StubFragmentInstance(), 0L, null, null);
@@ -140,17 +139,8 @@ public class DriverTask implements IDIndexedAccessible {
this.abortCause = abortCause;
}
- public void submitDependencyDriver() {
- if (blockedDependencyDriver != null) {
- this.blockedDependencyDriver.set(null);
- }
- }
-
public SettableFuture<Void> getBlockedDependencyDriver() {
- if (blockedDependencyDriver == null) {
- blockedDependencyDriver = SettableFuture.create();
- }
- return blockedDependencyDriver;
+ return driver.getDriverContext().getDownstreamOperator().getBlockedDependencyDriver();
}
public Priority getPriority() {
@@ -269,8 +259,8 @@ public class DriverTask implements IDIndexedAccessible {
}
@Override
- public int getDependencyDriverIndex() {
- return -1;
+ public DriverContext getDriverContext() {
+ return null;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..01a42de6fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -144,6 +144,10 @@ public class LocalExecutionPlanContext {
return pipelineDriverFactories;
}
+ public PipelineDriverFactory getCurrentPipelineDriverFactory() {
+ return pipelineDriverFactories.get(pipelineDriverFactories.size() - 1);
+ }
+
public int getPipelineNumber() {
return pipelineDriverFactories.size();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1ac76c4d52..110a7c2cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2596,6 +2596,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getDriverContext()),
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
+ context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
context
.getTimeSliceAllocator()
.recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
index 8cc9ec0e51..6896a34766 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/PipelineDriverFactory.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import static java.util.Objects.requireNonNull;
@@ -34,7 +35,6 @@ public class PipelineDriverFactory {
private final DriverContext driverContext;
// TODO Use OperatorFactory to replace operator to generate multiple drivers for on pipeline
private final Operator operation;
- private int dependencyPipelineIndex = -1;
public PipelineDriverFactory(Operator operation, DriverContext driverContext) {
this.operation = requireNonNull(operation, "rootOperator is null");
@@ -54,9 +54,6 @@ public class PipelineDriverFactory {
} else {
driver = new SchemaDriver(operation, (SchemaDriverContext) driverContext);
}
- if (dependencyPipelineIndex != -1) {
- driver.getDriverContext().setDependencyDriverIndex(dependencyPipelineIndex);
- }
return driver;
} catch (Throwable failure) {
try {
@@ -70,11 +67,15 @@ public class PipelineDriverFactory {
}
}
- public void setDependencyPipeline(int dependencyPipelineIndex) {
- this.dependencyPipelineIndex = dependencyPipelineIndex;
+ public void setDependencyPipeline(int dependencyDriverIndex) {
+ this.driverContext.setDependencyDriverIndex(dependencyDriverIndex);
}
public int getDependencyPipelineIndex() {
- return dependencyPipelineIndex;
+ return this.driverContext.getDependencyDriverIndex();
+ }
+
+ public void setDownstreamOperator(ExchangeOperator exchangeOperator) {
+ this.driverContext.setDownstreamOperator(exchangeOperator);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
index c3de94f4b1..65d3c89e55 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask;
@@ -63,12 +64,12 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId1 = new DriverTaskId(instanceId1, 0);
IDriver mockDriver1 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
- Mockito.when(mockDriver1.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver1.getDriverContext()).thenReturn(new DriverContext());
FragmentInstanceId instanceId2 = new FragmentInstanceId(fragmentId, "inst-1");
DriverTaskId driverTaskId2 = new DriverTaskId(instanceId2, 0);
IDriver mockDriver2 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver2.getDriverTaskId()).thenReturn(driverTaskId2);
- Mockito.when(mockDriver2.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver2.getDriverContext()).thenReturn(new DriverContext());
List<IDriver> instances = Arrays.asList(mockDriver1, mockDriver2);
manager.submitDrivers(queryId, instances, QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
@@ -93,7 +94,7 @@ public class DriverSchedulerTest {
FragmentInstanceId instanceId3 = new FragmentInstanceId(fragmentId, "inst-2");
DriverTaskId driverTaskId3 = new DriverTaskId(instanceId3, 0);
Mockito.when(mockDriver3.getDriverTaskId()).thenReturn(driverTaskId3);
- Mockito.when(mockDriver3.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver3.getDriverContext()).thenReturn(new DriverContext());
manager.submitDrivers(queryId, Collections.singletonList(mockDriver3), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
@@ -114,7 +115,7 @@ public class DriverSchedulerTest {
DriverTaskId driverTaskId4 = new DriverTaskId(instanceId4, 0);
IDriver mockDriver4 = Mockito.mock(IDriver.class);
Mockito.when(mockDriver4.getDriverTaskId()).thenReturn(driverTaskId4);
- Mockito.when(mockDriver4.getDependencyDriverIndex()).thenReturn(-1);
+ Mockito.when(mockDriver4.getDriverContext()).thenReturn(new DriverContext());
manager.submitDrivers(queryId2, Collections.singletonList(mockDriver4), QUERY_TIMEOUT_MS);
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());