You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/27 11:08:08 UTC
[iotdb] branch ResultHandleNPE created (now 632c52d243)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a change to branch ResultHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at 632c52d243 Avoid Result Handle clean up twice which will cause NPE
This branch includes the following new commits:
new 632c52d243 Avoid Result Handle clean up twice which will cause NPE
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: Avoid Result Handle clean up twice which will cause NPE
Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ResultHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 632c52d2431e2dec8ee23f92fbe518967faf084b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Apr 27 19:07:53 2023 +0800
Avoid Result Handle clean up twice which will cause NPE
---
.../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 2 +-
.../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 7 ++++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..f92650bcde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -125,7 +125,7 @@ public abstract class Driver implements IDriver {
tryWithLock(
100,
TimeUnit.MILLISECONDS,
- true,
+ false,
() -> {
// only keep doing query processing if driver state is still alive
if (state.get() == State.ALIVE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..715cf29c71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution {
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
+ // used for cleaning resultHandle up exactly once
+ private final AtomicBoolean resultHandleCleanUp;
+
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager;
@@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution {
}
});
this.stopped = new AtomicBoolean(false);
+ this.resultHandleCleanUp = new AtomicBoolean(false);
}
@FunctionalInterface
@@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution {
context.prepareForRetry();
// re-stop
this.stopped.compareAndSet(true, false);
+ this.resultHandleCleanUp.compareAndSet(true, false);
// re-analyze the query
this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
// re-start the QueryExecution
@@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution {
// We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
// We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
// FragmentInstanceId to register
- if (resultHandle instanceof SourceHandle) {
+ if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()