You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/14 12:06:34 UTC

[iotdb] branch master updated: [IOTDB-5210] Fix closed TsFileSequenceReader still cached in FileReaderManager

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b40d7b1727 [IOTDB-5210] Fix closed TsFileSequenceReader still cached in FileReaderManager
b40d7b1727 is described below

commit b40d7b17274df4d31ce2d595220fbfee93f70b08
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Dec 14 20:06:29 2022 +0800

    [IOTDB-5210] Fix closed TsFileSequenceReader still cached in FileReaderManager
---
 .../iotdb/db/mpp/execution/driver/Driver.java      | 31 +++++++++++++---------
 1 file changed, 19 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 086234fa12..b743b90ef0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -100,12 +100,6 @@ public abstract class Driver implements IDriver {
   public ListenableFuture<?> processFor(Duration duration) {
 
     SettableFuture<?> blockedFuture = driverBlockedFuture.get();
-    // initialization may be time-consuming, so we keep it in the processFor method
-    // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a
-    // critical bug
-    if (!init(blockedFuture)) {
-      return blockedFuture;
-    }
 
     // if the driver is blocked we don't need to continue
     if (!blockedFuture.isDone()) {
@@ -120,13 +114,26 @@ public abstract class Driver implements IDriver {
             TimeUnit.MILLISECONDS,
             true,
             () -> {
-              long start = System.nanoTime();
-              do {
-                ListenableFuture<?> future = processInternal();
-                if (!future.isDone()) {
-                  return updateDriverBlockedFuture(future);
+              // only keep doing query processing if driver state is still alive
+              if (state.get() == State.ALIVE) {
+                long start = System.nanoTime();
+                // initialization may be time-consuming, so we keep it in the processFor method
+                // in normal case, it won't cause deadlock and should finish soon, otherwise it will
+                // be a
+                // critical bug
+                // We should do initialization after holding the lock to avoid parallelism problems
+                // with close
+                if (!init(blockedFuture)) {
+                  return blockedFuture;
                 }
-              } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+
+                do {
+                  ListenableFuture<?> future = processInternal();
+                  if (!future.isDone()) {
+                    return updateDriverBlockedFuture(future);
+                  }
+                } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
+              }
               return NOT_BLOCKED;
             });