You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/31 11:53:22 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Release resource of FI after all drivers have been closed
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 0fb64817ac9 [To rel/1.1] Release resource of FI after all drivers have been closed
0fb64817ac9 is described below
commit 0fb64817ac93d7438732b623f641eb012755336d
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed May 31 19:53:16 2023 +0800
[To rel/1.1] Release resource of FI after all drivers have been closed
---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 2 +-
.../db/mpp/execution/driver/SchemaDriver.java | 2 +-
.../fragment/FragmentInstanceContext.java | 25 ++++++++++++++++++++++
.../fragment/FragmentInstanceExecution.java | 2 +-
.../fragment/FragmentInstanceManager.java | 2 ++
.../iotdb/db/mpp/execution/DataDriverTest.java | 1 +
6 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index d7378a67b00..a59fc1cda8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -97,7 +97,7 @@ public class DataDriver extends Driver {
@Override
protected void releaseResource() {
- // do nothing
+ driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 19847f83f0e..670670de310 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -39,6 +39,6 @@ public class SchemaDriver extends Driver {
@Override
protected void releaseResource() {
- // do nothing
+ driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 7e6257ef201..29e4ed0ee5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -72,6 +73,8 @@ public class FragmentInstanceContext extends QueryContext {
private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
+ private CountDownLatch allDriversClosed;
+
// session info
private SessionInfo sessionInfo;
@@ -350,6 +353,28 @@ public class FragmentInstanceContext extends QueryContext {
}
}
+ public void initializeNumOfDrivers(int numOfDrivers) {
+ // initialize with the num of Drivers
+ allDriversClosed = new CountDownLatch(numOfDrivers);
+ }
+
+ public void decrementNumOfUnClosedDriver() {
+ allDriversClosed.countDown();
+ }
+
+ public void releaseResourceWhenAllDriversAreClosed() {
+ while (true) {
+ try {
+ allDriversClosed.await();
+ break;
+ } catch (InterruptedException e) {
+ LOGGER.warn(
+ "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId());
+ }
+ }
+ releaseResource();
+ }
+
/**
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 35a20199a27..9b452c3c1b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -146,7 +146,7 @@ public class FragmentInstanceExecution {
for (IDriver driver : drivers) {
driver.close();
}
- context.releaseResource();
+ context.releaseResourceWhenAllDriversAreClosed();
// help for gc
drivers = null;
if (newState.isFailed()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 0b092ca0b8c..6866c9f0646 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -135,6 +135,7 @@ public class FragmentInstanceManager {
List<IDriver> drivers = new ArrayList<>();
driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+ context.initializeNumOfDrivers(drivers.size());
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
@@ -195,6 +196,7 @@ public class FragmentInstanceManager {
List<IDriver> drivers = new ArrayList<>();
driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+ context.initializeNumOfDrivers(drivers.size());
// get the sink of last driver
ISink sink = drivers.get(drivers.size() - 1).getSink();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2c801a99823..3781959d7f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -173,6 +173,7 @@ public class DataDriverTest {
dataRegion.query(driverContext.getPaths(), deviceId, fragmentInstanceContext, null))
.thenReturn(new QueryDataSource(seqResources, unSeqResources));
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+ fragmentInstanceContext.initializeNumOfDrivers(1);
StubSink stubSink = new StubSink(fragmentInstanceContext);
driverContext.setSink(stubSink);