You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/31 05:57:08 UTC
[iotdb] branch master updated: Release resource of FI after all drivers have been closed
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6589f6ab121 Release resource of FI after all drivers have been closed
6589f6ab121 is described below
commit 6589f6ab121f9ab47882c92dce34f0a515b549e2
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed May 31 13:57:00 2023 +0800
Release resource of FI after all drivers have been closed
---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 2 +-
.../db/mpp/execution/driver/SchemaDriver.java | 2 +-
.../fragment/FragmentInstanceContext.java | 25 ++++++++++++++++++++++
.../fragment/FragmentInstanceExecution.java | 2 +-
.../fragment/FragmentInstanceManager.java | 2 ++
.../iotdb/db/mpp/execution/DataDriverTest.java | 1 +
6 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index ebf8d2c9114..c7ff76b8de2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -102,7 +102,7 @@ public class DataDriver extends Driver {
@Override
protected void releaseResource() {
- // do nothing
+ driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 19847f83f0e..670670de310 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -39,6 +39,6 @@ public class SchemaDriver extends Driver {
@Override
protected void releaseResource() {
- // do nothing
+ driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 01882493a5d..6434e8faa06 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -74,6 +75,8 @@ public class FragmentInstanceContext extends QueryContext {
private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
+ private CountDownLatch allDriversClosed;
+
// session info
private SessionInfo sessionInfo;
@@ -352,6 +355,28 @@ public class FragmentInstanceContext extends QueryContext {
}
}
+ public void initializeNumOfDrivers(int numOfDrivers) {
+ // initialize with the num of Drivers
+ allDriversClosed = new CountDownLatch(numOfDrivers);
+ }
+
+ public void decrementNumOfUnClosedDriver() {
+ allDriversClosed.countDown();
+ }
+
+ public void releaseResourceWhenAllDriversAreClosed() {
+ while (true) {
+ try {
+ allDriversClosed.await();
+ break;
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId());
+ }
+ }
+ releaseResource();
+ }
+
/**
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 4cb85b0ae93..04f6043bdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -167,7 +167,7 @@ public class FragmentInstanceExecution {
for (IDriver driver : drivers) {
driver.close();
}
- context.releaseResource();
+ context.releaseResourceWhenAllDriversAreClosed();
// help for gc
drivers = null;
MPPDataExchangeService.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index b0f816f21e0..8b570ddf68b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -144,6 +144,7 @@ public class FragmentInstanceManager {
List<IDriver> drivers = new ArrayList<>();
driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+ context.initializeNumOfDrivers(drivers.size());
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
@@ -205,6 +206,7 @@ public class FragmentInstanceManager {
List<IDriver> drivers = new ArrayList<>();
driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+ context.initializeNumOfDrivers(drivers.size());
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 571aea5403c..7a395cb205d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -173,6 +173,7 @@ public class DataDriverTest {
dataRegion.query(driverContext.getPaths(), deviceId, fragmentInstanceContext, null))
.thenReturn(new QueryDataSource(seqResources, unSeqResources));
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+ fragmentInstanceContext.initializeNumOfDrivers(1);
StubSink stubSink = new StubSink(fragmentInstanceContext);
driverContext.setSink(stubSink);