You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 01:26:10 UTC

[iotdb] 01/01: [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryExecutionSafe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4aaa07d8adff717c78c10e849ce5918b61fcfc62
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 28 09:25:53 2023 +0800

    [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution
---
 .../execution/exchange/source/SourceHandle.java    |  2 +-
 .../db/mpp/plan/execution/QueryExecution.java      | 34 ++++++++++++----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 766c6b5b4f..5b91d1d984 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -342,7 +342,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public void abort(Throwable t) {
+  public synchronized void abort(Throwable t) {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       if (aborted || closed) {
         return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..c6c82c8f20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -455,26 +455,30 @@ public class QueryExecution implements IQueryExecution {
     // iterate until we get a non-nullable TsBlock or result is finished
     while (true) {
       try {
-        if (resultHandle.isAborted()) {
-          logger.warn("[ResultHandleAborted]");
-          stateMachine.transitionToAborted();
-          if (stateMachine.getFailureStatus() != null) {
-            throw new IoTDBException(
-                stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
-          } else {
-            throw new IoTDBException(
-                stateMachine.getFailureMessage(),
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+        ListenableFuture<?> blocked;
+        synchronized (resultHandle) {
+          if (resultHandle.isAborted()) {
+            logger.warn("[ResultHandleAborted]");
+            stateMachine.transitionToAborted();
+            if (stateMachine.getFailureStatus() != null) {
+              throw new IoTDBException(
+                  stateMachine.getFailureStatus().getMessage(),
+                  stateMachine.getFailureStatus().code);
+            } else {
+              throw new IoTDBException(
+                  stateMachine.getFailureMessage(),
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+            }
+          } else if (resultHandle.isFinished()) {
+            logger.debug("[ResultHandleFinished]");
+            stateMachine.transitionToFinished();
+            return Optional.empty();
           }
-        } else if (resultHandle.isFinished()) {
-          logger.debug("[ResultHandleFinished]");
-          stateMachine.transitionToFinished();
-          return Optional.empty();
+          blocked = resultHandle.isBlocked();
         }
 
         long startTime = System.nanoTime();
         try {
-          ListenableFuture<?> blocked = resultHandle.isBlocked();
           blocked.get();
         } finally {
           QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);