You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 01:26:10 UTC
[iotdb] 01/01: [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryExecutionSafe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4aaa07d8adff717c78c10e849ce5918b61fcfc62
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 28 09:25:53 2023 +0800
[IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution
---
.../execution/exchange/source/SourceHandle.java | 2 +-
.../db/mpp/plan/execution/QueryExecution.java | 34 ++++++++++++----------
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 766c6b5b4f..5b91d1d984 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -342,7 +342,7 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public void abort(Throwable t) {
+ public synchronized void abort(Throwable t) {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (aborted || closed) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..c6c82c8f20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -455,26 +455,30 @@ public class QueryExecution implements IQueryExecution {
// iterate until we get a non-nullable TsBlock or result is finished
while (true) {
try {
- if (resultHandle.isAborted()) {
- logger.warn("[ResultHandleAborted]");
- stateMachine.transitionToAborted();
- if (stateMachine.getFailureStatus() != null) {
- throw new IoTDBException(
- stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
- } else {
- throw new IoTDBException(
- stateMachine.getFailureMessage(),
- TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ ListenableFuture<?> blocked;
+ synchronized (resultHandle) {
+ if (resultHandle.isAborted()) {
+ logger.warn("[ResultHandleAborted]");
+ stateMachine.transitionToAborted();
+ if (stateMachine.getFailureStatus() != null) {
+ throw new IoTDBException(
+ stateMachine.getFailureStatus().getMessage(),
+ stateMachine.getFailureStatus().code);
+ } else {
+ throw new IoTDBException(
+ stateMachine.getFailureMessage(),
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ } else if (resultHandle.isFinished()) {
+ logger.debug("[ResultHandleFinished]");
+ stateMachine.transitionToFinished();
+ return Optional.empty();
}
- } else if (resultHandle.isFinished()) {
- logger.debug("[ResultHandleFinished]");
- stateMachine.transitionToFinished();
- return Optional.empty();
+ blocked = resultHandle.isBlocked();
}
long startTime = System.nanoTime();
try {
- ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
} finally {
QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);