You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/05/31 11:53:22 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Release resource of FI after all drivers have been closed

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 0fb64817ac9 [To rel/1.1] Release resource of FI after all drivers have been closed
0fb64817ac9 is described below

commit 0fb64817ac93d7438732b623f641eb012755336d
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed May 31 19:53:16 2023 +0800

    [To rel/1.1] Release resource of FI after all drivers have been closed
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |  2 +-
 .../db/mpp/execution/driver/SchemaDriver.java      |  2 +-
 .../fragment/FragmentInstanceContext.java          | 25 ++++++++++++++++++++++
 .../fragment/FragmentInstanceExecution.java        |  2 +-
 .../fragment/FragmentInstanceManager.java          |  2 ++
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  1 +
 6 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index d7378a67b00..a59fc1cda8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -97,7 +97,7 @@ public class DataDriver extends Driver {
 
   @Override
   protected void releaseResource() {
-    // do nothing
+    driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 19847f83f0e..670670de310 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -39,6 +39,6 @@ public class SchemaDriver extends Driver {
 
   @Override
   protected void releaseResource() {
-    // do nothing
+    driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 7e6257ef201..29e4ed0ee5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -72,6 +73,8 @@ public class FragmentInstanceContext extends QueryContext {
   private final AtomicReference<Long> lastExecutionStartTime = new AtomicReference<>();
   private final AtomicReference<Long> executionEndTime = new AtomicReference<>();
 
+  private CountDownLatch allDriversClosed;
+
   // session info
   private SessionInfo sessionInfo;
 
@@ -350,6 +353,28 @@ public class FragmentInstanceContext extends QueryContext {
     }
   }
 
+  public void initializeNumOfDrivers(int numOfDrivers) {
+    // initialize with the num of Drivers
+    allDriversClosed = new CountDownLatch(numOfDrivers);
+  }
+
+  public void decrementNumOfUnClosedDriver() {
+    allDriversClosed.countDown();
+  }
+
+  public void releaseResourceWhenAllDriversAreClosed() {
+    while (true) {
+      try {
+        allDriversClosed.await();
+        break;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId());
+      }
+    }
+    releaseResource();
+  }
+
   /**
    * All file paths used by this fragment instance must be cleared and thus the usage reference must
    * be decreased.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 35a20199a27..9b452c3c1b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -146,7 +146,7 @@ public class FragmentInstanceExecution {
             for (IDriver driver : drivers) {
               driver.close();
             }
-            context.releaseResource();
+            context.releaseResourceWhenAllDriversAreClosed();
             // help for gc
             drivers = null;
             if (newState.isFailed()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 0b092ca0b8c..6866c9f0646 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -135,6 +135,7 @@ public class FragmentInstanceManager {
 
                   List<IDriver> drivers = new ArrayList<>();
                   driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+                  context.initializeNumOfDrivers(drivers.size());
                   // get the sink of last driver
                   ISink sink = drivers.get(drivers.size() - 1).getSink();
 
@@ -195,6 +196,7 @@ public class FragmentInstanceManager {
 
                 List<IDriver> drivers = new ArrayList<>();
                 driverFactories.forEach(factory -> drivers.add(factory.createDriver()));
+                context.initializeNumOfDrivers(drivers.size());
                 // get the sink of last driver
                 ISink sink = drivers.get(drivers.size() - 1).getSink();
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 2c801a99823..3781959d7f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -173,6 +173,7 @@ public class DataDriverTest {
               dataRegion.query(driverContext.getPaths(), deviceId, fragmentInstanceContext, null))
           .thenReturn(new QueryDataSource(seqResources, unSeqResources));
       fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+      fragmentInstanceContext.initializeNumOfDrivers(1);
 
       StubSink stubSink = new StubSink(fragmentInstanceContext);
       driverContext.setSink(stubSink);