You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/28 01:26:09 UTC

[iotdb] branch QueryExecutionSafe created (now 4aaa07d8ad)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch QueryExecutionSafe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 4aaa07d8ad [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution

This branch includes the following new commits:

     new 4aaa07d8ad [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryExecutionSafe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4aaa07d8adff717c78c10e849ce5918b61fcfc62
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 28 09:25:53 2023 +0800

    [IOTDB-5833] Fix concurrent bug caused by non-atomic operation in QueryExecution
---
 .../execution/exchange/source/SourceHandle.java    |  2 +-
 .../db/mpp/plan/execution/QueryExecution.java      | 34 ++++++++++++----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 766c6b5b4f..5b91d1d984 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -342,7 +342,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public void abort(Throwable t) {
+  public synchronized void abort(Throwable t) {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       if (aborted || closed) {
         return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..c6c82c8f20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -455,26 +455,30 @@ public class QueryExecution implements IQueryExecution {
     // iterate until we get a non-nullable TsBlock or result is finished
     while (true) {
       try {
-        if (resultHandle.isAborted()) {
-          logger.warn("[ResultHandleAborted]");
-          stateMachine.transitionToAborted();
-          if (stateMachine.getFailureStatus() != null) {
-            throw new IoTDBException(
-                stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
-          } else {
-            throw new IoTDBException(
-                stateMachine.getFailureMessage(),
-                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+        ListenableFuture<?> blocked;
+        synchronized (resultHandle) {
+          if (resultHandle.isAborted()) {
+            logger.warn("[ResultHandleAborted]");
+            stateMachine.transitionToAborted();
+            if (stateMachine.getFailureStatus() != null) {
+              throw new IoTDBException(
+                  stateMachine.getFailureStatus().getMessage(),
+                  stateMachine.getFailureStatus().code);
+            } else {
+              throw new IoTDBException(
+                  stateMachine.getFailureMessage(),
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+            }
+          } else if (resultHandle.isFinished()) {
+            logger.debug("[ResultHandleFinished]");
+            stateMachine.transitionToFinished();
+            return Optional.empty();
           }
-        } else if (resultHandle.isFinished()) {
-          logger.debug("[ResultHandleFinished]");
-          stateMachine.transitionToFinished();
-          return Optional.empty();
+          blocked = resultHandle.isBlocked();
         }
 
         long startTime = System.nanoTime();
         try {
-          ListenableFuture<?> blocked = resultHandle.isBlocked();
           blocked.get();
         } finally {
           QUERY_METRICS.recordExecutionCost(WAIT_FOR_RESULT, System.nanoTime() - startTime);