You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 01:10:40 UTC

[iotdb] branch master updated: Avoid Result Handle clean up twice which will cause NPE

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e211451032 Avoid Result Handle clean up twice which will cause NPE
e211451032 is described below

commit e21145103293e30329db30a862ebe541dba4e16d
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 28 09:10:31 2023 +0800

    Avoid Result Handle clean up twice which will cause NPE
---
 .../java/org/apache/iotdb/db/mpp/execution/driver/Driver.java    | 2 +-
 .../iotdb/db/mpp/execution/schedule/AbstractDriverThread.java    | 9 +++++++++
 .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java   | 7 ++++++-
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..f92650bcde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -125,7 +125,7 @@ public abstract class Driver implements IDriver {
         tryWithLock(
             100,
             TimeUnit.MILLISECONDS,
-            true,
+            false,
             () -> {
               // only keep doing query processing if driver state is still alive
               if (state.get() == State.ALIVE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 72f5ce64a8..04db5adcb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -80,6 +80,15 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
             next.setAbortCause(DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
             scheduler.toAborted(next);
           }
+        } finally {
+          // Clear the interrupted flag on the current thread, driver cancellation may have
+          // triggered an interrupt
+          if (Thread.interrupted()) {
+            if (closed) {
+              // reset interrupted flag if closed before interrupt
+              Thread.currentThread().interrupt();
+            }
+          }
         }
       }
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..715cf29c71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
+  // used for cleaning resultHandle up exactly once
+  private final AtomicBoolean resultHandleCleanUp;
+
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       syncInternalServiceClientManager;
 
@@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution {
           }
         });
     this.stopped = new AtomicBoolean(false);
+    this.resultHandleCleanUp = new AtomicBoolean(false);
   }
 
   @FunctionalInterface
@@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution {
     context.prepareForRetry();
     // re-stop
     this.stopped.compareAndSet(true, false);
+    this.resultHandleCleanUp.compareAndSet(true, false);
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
     // re-start the QueryExecution
@@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution {
     // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
     // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandle instanceof SourceHandle) {
+    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()