You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 01:10:40 UTC
[iotdb] branch master updated: Avoid Result Handle clean up twice which will cause NPE
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e211451032 Avoid Result Handle clean up twice which will cause NPE
e211451032 is described below
commit e21145103293e30329db30a862ebe541dba4e16d
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Apr 28 09:10:31 2023 +0800
Avoid Result Handle clean up twice which will cause NPE
---
.../java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 2 +-
.../iotdb/db/mpp/execution/schedule/AbstractDriverThread.java | 9 +++++++++
.../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 7 ++++++-
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..f92650bcde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -125,7 +125,7 @@ public abstract class Driver implements IDriver {
tryWithLock(
100,
TimeUnit.MILLISECONDS,
- true,
+ false,
() -> {
// only keep doing query processing if driver state is still alive
if (state.get() == State.ALIVE) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index 72f5ce64a8..04db5adcb5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -80,6 +80,15 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
next.setAbortCause(DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
scheduler.toAborted(next);
}
+ } finally {
+ // Clear the interrupted flag on the current thread, driver cancellation may have
+ // triggered an interrupt
+ if (Thread.interrupted()) {
+ if (closed) {
+ // reset interrupted flag if closed before interrupt
+ Thread.currentThread().interrupt();
+ }
+ }
}
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..715cf29c71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution {
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
+ // used for cleaning resultHandle up exactly once
+ private final AtomicBoolean resultHandleCleanUp;
+
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager;
@@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution {
}
});
this.stopped = new AtomicBoolean(false);
+ this.resultHandleCleanUp = new AtomicBoolean(false);
}
@FunctionalInterface
@@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution {
context.prepareForRetry();
// re-stop
this.stopped.compareAndSet(true, false);
+ this.resultHandleCleanUp.compareAndSet(true, false);
// re-analyze the query
this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
// re-start the QueryExecution
@@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution {
// We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
// We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
// FragmentInstanceId to register
- if (resultHandle instanceof SourceHandle) {
+ if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()