You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/27 11:08:08 UTC

[iotdb] branch ResultHandleNPE created (now 632c52d243)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ResultHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 632c52d243 Avoid Result Handle clean up twice which will cause NPE

This branch includes the following new commits:

     new 632c52d243 Avoid Result Handle clean up twice which will cause NPE

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Avoid Result Handle clean up twice which will cause NPE

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ResultHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 632c52d2431e2dec8ee23f92fbe518967faf084b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Apr 27 19:07:53 2023 +0800

    Avoid Result Handle clean up twice which will cause NPE
---
 .../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 2 +-
 .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java     | 7 ++++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..f92650bcde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -125,7 +125,7 @@ public abstract class Driver implements IDriver {
         tryWithLock(
             100,
             TimeUnit.MILLISECONDS,
-            true,
+            false,
             () -> {
               // only keep doing query processing if driver state is still alive
               if (state.get() == State.ALIVE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..715cf29c71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
+  // used for cleaning resultHandle up exactly once
+  private final AtomicBoolean resultHandleCleanUp;
+
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       syncInternalServiceClientManager;
 
@@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution {
           }
         });
     this.stopped = new AtomicBoolean(false);
+    this.resultHandleCleanUp = new AtomicBoolean(false);
   }
 
   @FunctionalInterface
@@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution {
     context.prepareForRetry();
     // re-stop
     this.stopped.compareAndSet(true, false);
+    this.resultHandleCleanUp.compareAndSet(true, false);
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
     // re-start the QueryExecution
@@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution {
     // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
     // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandle instanceof SourceHandle) {
+    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()