You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/31 05:57:08 UTC

[iotdb] branch master updated: Release resource of FI after all drivers have been closed

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6589f6ab121 Release resource of FI after all drivers have been closed
6589f6ab121 is described below

commit 6589f6ab121f9ab47882c92dce34f0a515b549e2
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed May 31 13:57:00 2023 +0800

    Release resource of FI after all drivers have been closed
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |  2 +-
 .../db/mpp/execution/driver/SchemaDriver.java      |  2 +-
 .../fragment/FragmentInstanceContext.java          | 25 ++++++++++++++++++++++
 .../fragment/FragmentInstanceExecution.java        |  2 +-
 .../fragment/FragmentInstanceManager.java          |  2 ++
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  1 +
 6 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index ebf8d2c9114..c7ff76b8de2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -102,7 +102,7 @@ public class DataDriver extends Driver {
 
   @Override
   protected void releaseResource() {
-    // do nothing
+    driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 19847f83f0e..670670de310 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -39,6 +39,6 @@ public class SchemaDriver extends Driver {
 
   @Override
   protected void releaseResource() {
-    // do nothing
+    driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 01882493a5d..6434e8faa06 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -74,6 +75,8 @@ public class FragmentInstanceContext extends QueryContext {
   private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
   private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
+  private CountDownLatch allDriversClosed;
+
   // session info
   private SessionInfo sessionInfo;
 
@@ -352,6 +355,28 @@ public class FragmentInstanceContext extends QueryContext {
     }
   }
 
+  public void initializeNumOfDrivers(int numOfDrivers) {
+    // initialize with the num of Drivers
+    allDriversClosed = new CountDownLatch(numOfDrivers);
+  }
+
+  public void decrementNumOfUnClosedDriver() {
+    allDriversClosed.countDown();
+  }
+
+  public void releaseResourceWhenAllDriversAreClosed() {
+    while (true) {
+      try {
+        allDriversClosed.await();
+        break;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId());
+      }
+    }
+    releaseResource();
+  }
+
   /**
    * All file paths used by this fragment instance must be cleared and thus the usage reference must
    * be decreased.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 4cb85b0ae93..04f6043bdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -167,7 +167,7 @@ public class FragmentInstanceExecution {
             for (IDriver driver : drivers) {
               driver.close();
             }
-            context.releaseResource();
+            context.releaseResourceWhenAllDriversAreClosed();
             // help for gc
             drivers = null;
             MPPDataExchangeService.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index b0f816f21e0..8b570ddf68b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -144,6 +144,7 @@ public class FragmentInstanceManager {
 
                   List<IDriver> drivers = new ArrayList<>();
                   driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+                  context.initializeNumOfDrivers(drivers.size());
                   // get the sink of last driver
                   ISink sink = drivers.get(drivers.size() - 1).getSink();
 
@@ -205,6 +206,7 @@ public class FragmentInstanceManager {
 
                 List<IDriver> drivers = new ArrayList<>();
                 driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+                context.initializeNumOfDrivers(drivers.size());
                 // get the sink of last driver
                 ISink sink = drivers.get(drivers.size() - 1).getSink();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 571aea5403c..7a395cb205d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -173,6 +173,7 @@ public class DataDriverTest {
               dataRegion.query(driverContext.getPaths(), deviceId, fragmentInstanceContext, null))
           .thenReturn(new QueryDataSource(seqResources, unSeqResources));
       fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+      fragmentInstanceContext.initializeNumOfDrivers(1);
 
       StubSink stubSink = new StubSink(fragmentInstanceContext);
       driverContext.setSink(stubSink);