You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/12 11:08:59 UTC

[iotdb] branch QueryException created (now 8322e9e83d)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 8322e9e83d Fix compiler error

This branch includes the following new commits:

     new 50ed598efc make query error more concrete
     new 55a032e3ed Done
     new 8322e9e83d Fix compiler error

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/03: make query error more concrete

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 50ed598efc04e8b6aa05d82cac36fe57be6db919
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Jul 12 13:53:27 2022 +0800

    make query error more concrete
---
 .../db/mpp/execution/exchange/ISourceHandle.java   |  12 +-
 .../mpp/execution/exchange/LocalSourceHandle.java  |  45 +++++-
 .../execution/exchange/MPPDataExchangeManager.java |  12 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |  25 +++-
 .../db/mpp/execution/exchange/SourceHandle.java    |  48 +++++--
 .../fragment/FragmentInstanceContext.java          |   7 +
 .../fragment/FragmentInstanceExecution.java        |  12 +-
 .../fragment/FragmentInstanceManager.java          |   3 +-
 .../operator/source/ExchangeOperator.java          |   2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |   8 +-
 .../db/mpp/plan/execution/IQueryExecution.java     |   3 +-
 .../db/mpp/plan/execution/QueryExecution.java      |  32 ++++-
 .../plan/execution/memory/MemorySourceHandle.java  |   3 +
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   1 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 158 +++++++++++----------
 .../mpprest/handler/QueryDataSetHandler.java       |  21 +--
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  80 ++++++++---
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |   3 +-
 thrift/src/main/thrift/datanode.thrift             |   1 +
 20 files changed, 339 insertions(+), 139 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index be62bff4ff..f2538c8db7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -50,8 +50,18 @@ public interface ISourceHandle {
   boolean isAborted();
 
   /**
-   * Abort the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+   * Abort the handle. Discard all tsblocks which may still be in the memory buffer and cancel the
    * future returned by {@link #isBlocked()}.
+   *
+   * <p>Should only be called in abnormal case
    */
   void abort();
+
+  /**
+   * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+   * future returned by {@link #isBlocked()}.
+   *
+   * <p>Should only be called in normal case
+   */
+  void close();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index c101e7b644..95056cf44b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -43,6 +43,8 @@ public class LocalSourceHandle implements ISourceHandle {
   private final SharedTsBlockQueue queue;
   private boolean aborted = false;
 
+  private boolean closed = false;
+
   private int currSequenceId;
 
   private final String threadName;
@@ -81,9 +83,8 @@ public class LocalSourceHandle implements ISourceHandle {
   @Override
   public TsBlock receive() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
-        throw new IllegalStateException("Source handle is aborted.");
-      }
+      checkState();
+
       if (!queue.isBlocked().isDone()) {
         throw new IllegalStateException("Source handle is blocked.");
       }
@@ -123,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (aborted) {
-      throw new IllegalStateException("Source handle is closed.");
-    }
+    checkState();
     return nonCancellationPropagating(queue.isBlocked());
   }
 
@@ -136,6 +135,9 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public void abort() {
+    if (aborted || closed) {
+      return;
+    }
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       logger.info("Source handle is being aborted.");
       synchronized (queue) {
@@ -143,7 +145,7 @@ public class LocalSourceHandle implements ISourceHandle {
           if (aborted) {
             return;
           }
-          queue.destroy();
+          queue.abort();
           aborted = true;
           sourceHandleListener.onAborted(this);
         }
@@ -152,6 +154,35 @@ public class LocalSourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public void close() {
+    if (aborted || closed) {
+      return;
+    }
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      logger.info("Source handle is being closed.");
+      synchronized (queue) {
+        synchronized (this) {
+          if (aborted) {
+            return;
+          }
+          queue.destroy();
+          closed = true;
+          sourceHandleListener.onFinished(this);
+        }
+      }
+      logger.info("Source handle is closed");
+    }
+  }
+
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("Source Handle is closed.");
+    }
+  }
+
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 8aa2a6fa3a..7f087e2a9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -141,7 +141,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             || sourceHandles
                 .get(e.getTargetFragmentInstanceId())
                 .get(e.getTargetPlanNodeId())
-                .isAborted()) {
+                .isAborted()
+            || sourceHandles
+                .get(e.getTargetFragmentInstanceId())
+                .get(e.getTargetPlanNodeId())
+                .isFinished()) {
           // In some scenario, when the SourceHandle sends the data block ACK event, its upstream
           // may
           // have already been stopped. For example, in the query whit LimitOperator, the downstream
@@ -176,7 +180,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             || sourceHandles
                 .get(e.getTargetFragmentInstanceId())
                 .get(e.getTargetPlanNodeId())
-                .isAborted()) {
+                .isAborted()
+            || sourceHandles
+                .get(e.getTargetFragmentInstanceId())
+                .get(e.getTargetPlanNodeId())
+                .isFinished()) {
           logger.warn(
               "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 7aa96230ba..7cac64a6e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -152,7 +152,7 @@ public class SharedTsBlockQueue {
     return blockedOnMemory;
   }
 
-  /** Destroy the queue and cancel the future. */
+  /** Destroy the queue and complete the future. Should only be called in normal case */
   public void destroy() {
     if (destroyed) {
       return;
@@ -172,4 +172,27 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  // TODO add Throwable t as a parameter of this method, and then call blocked.setException(t);
+  // instead of blocked.cancel(true);
+  /** Destroy the queue and cancel the future. Should only be called in normal case */
+  public void abort() {
+    if (destroyed) {
+      return;
+    }
+    destroyed = true;
+    if (!blocked.isDone()) {
+      blocked.cancel(true);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index facd0c1e0f..15ac9e0529 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -86,6 +86,8 @@ public class SourceHandle implements ISourceHandle {
   private int lastSequenceId = Integer.MAX_VALUE;
   private boolean aborted = false;
 
+  private boolean closed = false;
+
   public SourceHandle(
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -116,9 +118,8 @@ public class SourceHandle implements ISourceHandle {
   public synchronized TsBlock receive() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
 
-      if (aborted) {
-        throw new IllegalStateException("Source handle is aborted.");
-      }
+      checkState();
+
       if (!blocked.isDone()) {
         throw new IllegalStateException("Source handle is blocked.");
       }
@@ -149,7 +150,7 @@ public class SourceHandle implements ISourceHandle {
 
   private synchronized void trySubmitGetDataBlocksTask() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
@@ -202,9 +203,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized ListenableFuture<?> isBlocked() {
-    if (aborted) {
-      throw new IllegalStateException("Source handle is aborted.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -234,7 +233,7 @@ public class SourceHandle implements ISourceHandle {
   @Override
   public synchronized void abort() {
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
-      if (aborted) {
+      if (aborted || closed) {
         return;
       }
       if (blocked != null && !blocked.isDone()) {
@@ -255,6 +254,31 @@ public class SourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public synchronized void close() {
+    try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+      if (aborted || closed) {
+        return;
+      }
+      if (blocked != null && !blocked.isDone()) {
+        blocked.set(null);
+      }
+      if (blockedOnMemory != null) {
+        bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+      }
+      sequenceIdToDataBlockSize.clear();
+      if (bufferRetainedSizeInBytes > 0) {
+        localMemoryManager
+            .getQueryPool()
+            .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+        bufferRetainedSizeInBytes = 0;
+      }
+      closed = true;
+      currSequenceId = lastSequenceId + 1;
+      sourceHandleListener.onFinished(this);
+    }
+  }
+
   @Override
   public boolean isFinished() {
     return remoteTsBlockedConsumedUp();
@@ -293,6 +317,14 @@ public class SourceHandle implements ISourceHandle {
     return aborted;
   }
 
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("SourceHandle is closed.");
+    }
+  }
+
   @Override
   public String toString() {
     return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index ffc9d101b3..0d5cdf54d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -150,6 +151,12 @@ public class FragmentInstanceContext extends QueryContext {
     stateMachine.failed(cause);
   }
 
+  public String getFailedCause() {
+    return stateMachine.getFailureCauses().stream()
+        .map(Throwable::getMessage)
+        .collect(Collectors.joining("; "));
+  }
+
   public void finished() {
     stateMachine.finished();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index ab51fa842d..ac70a848dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -84,12 +84,8 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime());
-  }
-
-  public void failed(Throwable cause) {
-    requireNonNull(cause, "cause is null");
-    stateMachine.failed(cause);
+    return new FragmentInstanceInfo(
+        stateMachine.getState(), context.getEndTime(), context.getFailedCause());
   }
 
   public void cancel() {
@@ -121,7 +117,9 @@ public class FragmentInstanceExecution {
             sinkHandle.abort();
             // help for gc
             sinkHandle = null;
-            scheduler.abortFragmentInstance(instanceId);
+            if (newState.isFailed()) {
+              scheduler.abortFragmentInstance(instanceId);
+            }
           }
         });
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 504d3e4676..51b689eda2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -194,8 +194,9 @@ public class FragmentInstanceManager {
   }
 
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
+    FragmentInstanceContext context = instanceContext.get(instanceId);
     return new FragmentInstanceInfo(
-        FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
+        FragmentInstanceState.FAILED, context.getEndTime(), context.getFailedCause());
   }
 
   private void removeOldInstances() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 3579a642bf..c72063caeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -81,6 +81,6 @@ public class ExchangeOperator implements SourceOperator {
 
   @Override
   public void close() throws Exception {
-    sourceHandle.abort();
+    sourceHandle.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 19356f6471..2532cc4cc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -119,7 +120,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         while (coordinator.getQueryExecution(queryId).hasNextResult()) {
           // The query will be transited to FINISHED when invoking getBatchResult() at the last time
           // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
           if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
             break;
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 70fffda9b6..ba0380d015 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.execution;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
@@ -34,7 +35,7 @@ public interface IQueryExecution {
 
   ExecutionResult getStatus();
 
-  Optional<TsBlock> getBatchResult();
+  Optional<TsBlock> getBatchResult() throws IoTDBException;
 
   boolean hasNextResult();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4bae3eeafc..fe6d48afa2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,7 +65,6 @@ import io.airlift.concurrent.SetThreadName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -74,6 +74,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
@@ -280,11 +281,22 @@ public class QueryExecution implements IQueryExecution {
    * implemented with DataStreamManager)
    */
   @Override
-  public Optional<TsBlock> getBatchResult() {
+  public Optional<TsBlock> getBatchResult() throws IoTDBException {
+    checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly.");
     // iterate until we get a non-nullable TsBlock or result is finished
     while (true) {
       try {
-        if (resultHandle == null || resultHandle.isAborted() || resultHandle.isFinished()) {
+        if (resultHandle.isAborted()) {
+          logger.info("resultHandle for client is aborted");
+          stateMachine.transitionToAborted();
+          if (stateMachine.getFailureStatus() != null) {
+            throw new IoTDBException(
+                stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
+          } else {
+            throw new IoTDBException(
+                stateMachine.getFailureMessage(), TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+          }
+        } else if (resultHandle.isFinished()) {
           // Once the resultHandle is finished, we should transit the state of this query to
           // FINISHED.
           // So that the corresponding cleanup work could be triggered.
@@ -292,6 +304,7 @@ public class QueryExecution implements IQueryExecution {
           stateMachine.transitionToFinished();
           return Optional.empty();
         }
+
         ListenableFuture<?> blocked = resultHandle.isBlocked();
         blocked.get();
         if (!resultHandle.isFinished()) {
@@ -305,13 +318,17 @@ public class QueryExecution implements IQueryExecution {
         }
       } catch (ExecutionException | CancellationException e) {
         stateMachine.transitionToFailed(e);
+        if (stateMachine.getFailureStatus() != null) {
+          throw new IoTDBException(
+              stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
+        }
         Throwable t = e.getCause() == null ? e : e.getCause();
         throwIfUnchecked(t);
-        throw new RuntimeException(t);
+        throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
       } catch (InterruptedException e) {
         stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
-        throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
+        throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
       }
     }
   }
@@ -363,7 +380,10 @@ public class QueryExecution implements IQueryExecution {
       }
       return new ExecutionResult(
           context.getQueryId(),
-          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage()));
+          stateMachine.getFailureStatus() == null
+              ? RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
+              : stateMachine.getFailureStatus());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index 588407eae1..2cf224962b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -77,4 +77,7 @@ public class MemorySourceHandle implements ISourceHandle {
 
   @Override
   public void abort() {}
+
+  @Override
+  public void close() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 0a6d51eb4a..73730864d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -103,6 +103,7 @@ public class ClusterScheduler implements IScheduler {
         if (result.getFailureStatus() != null) {
           stateMachine.transitionToFailed(result.getFailureStatus());
         } else {
+          // won't get into here
           stateMachine.transitionToFailed(
               new IllegalStateException("Fragment cannot be dispatched"));
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index d3f0d21e86..21a4436a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -46,16 +47,13 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import com.google.common.util.concurrent.SettableFuture;
 import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -102,67 +100,45 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     return executor.submit(
         () -> {
           for (FragmentInstance instance : instances) {
-            boolean accepted = dispatchOneInstance(instance);
-            if (!accepted) {
-              return new FragInstanceDispatchResult(false);
+            try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+              dispatchOneInstance(instance);
+            } catch (FragmentInstanceDispatchException e) {
+              return new FragInstanceDispatchResult(e.getFailureStatus());
+            } catch (Throwable t) {
+              logger.error("cannot dispatch FI for read operation", t);
+              return new FragInstanceDispatchResult(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
             }
           }
           return new FragInstanceDispatchResult(true);
         });
   }
 
-  // TODO: (xingtanzjr) Return the detailed write states for each FragmentInstance
-  private Future<FragInstanceDispatchResult> dispatchWrite(List<FragmentInstance> instances) {
-    List<Future<Boolean>> futures = new LinkedList<>();
-    for (FragmentInstance instance : instances) {
-      futures.add(writeOperationExecutor.submit(() -> dispatchOneInstance(instance)));
-    }
-    SettableFuture<FragInstanceDispatchResult> resultFuture = SettableFuture.create();
-    for (Future<Boolean> future : futures) {
-      try {
-        Boolean success = future.get();
-        if (!success) {
-          resultFuture.set(new FragInstanceDispatchResult(false));
-          break;
-        }
-      } catch (ExecutionException | InterruptedException e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        resultFuture.setException(e);
-        break;
-      }
-    }
-    resultFuture.set(new FragInstanceDispatchResult(true));
-    return resultFuture;
-  }
-
   private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) {
-    boolean result = true;
-    try {
-      for (FragmentInstance instance : instances) {
-
-        if (!dispatchOneInstance(instance)) {
-          result = false;
-          break;
-        }
+    for (FragmentInstance instance : instances) {
+      try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
+        dispatchOneInstance(instance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.error("cannot dispatch FI for write operation", t);
+        return immediateFuture(
+            new FragInstanceDispatchResult(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
       }
-      return immediateFuture(new FragInstanceDispatchResult(result));
-    } catch (FragmentInstanceDispatchException e) {
-      logger.error("cannot dispatch FI for write operation", e);
-      return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
     }
+    return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
-  private boolean dispatchOneInstance(FragmentInstance instance)
+  private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
-    try (SetThreadName fragmentInstanceName = new SetThreadName(instance.getId().getFullId())) {
-      TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
-      if (isDispatchedToLocal(endPoint)) {
-        return dispatchLocally(instance);
-      } else {
-        return dispatchRemote(instance, endPoint);
-      }
+    TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+    if (isDispatchedToLocal(endPoint)) {
+      dispatchLocally(instance);
+    } else {
+      dispatchRemote(instance, endPoint);
     }
   }
 
@@ -170,7 +146,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
   }
 
-  private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+  private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
@@ -182,7 +158,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
                   instance.getRegionReplicaSet().getRegionId());
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
-          return sendFragmentInstanceResp.accepted;
+          if (!sendFragmentInstanceResp.accepted) {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(
+                    TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message));
+          }
         case WRITE:
           TSendPlanNodeReq sendPlanNodeReq =
               new TSendPlanNodeReq(
@@ -193,36 +173,67 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
             logger.error(sendPlanNodeResp.getStatus().message);
             throw new FragmentInstanceDispatchException(sendPlanNodeResp.getStatus());
           }
-          return true;
       }
     } catch (IOException | TException e) {
       logger.error("can't connect to node {}", endPoint, e);
-      throw new FragmentInstanceDispatchException(e);
+      TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
+      status.setMessage("can't connect to node {}" + endPoint);
+      throw new FragmentInstanceDispatchException(status);
     }
-    return false;
   }
 
-  private boolean dispatchLocally(FragmentInstance instance)
-      throws FragmentInstanceDispatchException {
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(
-            instance.getRegionReplicaSet().getRegionId());
+  private void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException {
+    // deserialize ConsensusGroupId
+    ConsensusGroupId groupId;
+    try {
+      groupId =
+          ConsensusGroupId.Factory.createFromTConsensusGroupId(
+              instance.getRegionReplicaSet().getRegionId());
+    } catch (Throwable t) {
+      logger.error("Deserialize ConsensusGroupId failed. ", t);
+      throw new FragmentInstanceDispatchException(
+          RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR,
+              "Deserialize ConsensusGroupId failed: " + t.getMessage()));
+    }
+
     switch (instance.getType()) {
       case READ:
+        // execute fragment instance in state machine
         ConsensusReadResponse readResponse;
-        if (groupId instanceof DataRegionId) {
-          readResponse = DataRegionConsensusImpl.getInstance().read(groupId, instance);
-        } else {
-          readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
+        try {
+          if (groupId instanceof DataRegionId) {
+            readResponse = DataRegionConsensusImpl.getInstance().read(groupId, instance);
+          } else {
+            readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
+          }
+        } catch (Throwable t) {
+          logger.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, t);
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                  "Execute FragmentInstance failed. " + t.getMessage()));
         }
         if (!readResponse.isSuccess()) {
           logger.error(
-              "dispatch FragmentInstance {} locally failed because {}",
+              "dispatch FragmentInstance {} locally failed. ",
               instance,
               readResponse.getException());
-          return false;
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                  "Execute FragmentInstance failed: "
+                      + (readResponse.getException() == null
+                          ? ""
+                          : readResponse.getException().getMessage())));
+        } else {
+          FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset();
+          if (!info.getState().isFailed()) {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage()));
+          }
         }
-        return !((FragmentInstanceInfo) readResponse.getDataset()).getState().isFailed();
       case WRITE:
         PlanNode planNode = instance.getFragment().getRoot();
         boolean hasFailedMeasurement = false;
@@ -258,11 +269,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
               RpcUtils.getStatus(
                   TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
         }
-
-        return true;
     }
-    throw new UnsupportedOperationException(
-        String.format("unknown query type [%s]", instance.getType()));
+
+    throw new FragmentInstanceDispatchException(
+        RpcUtils.getStatus(
+            TSStatusCode.INTERNAL_SERVER_ERROR,
+            String.format("unknown query type [%s]", instance.getType())));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
index eaa55d1d3f..2fc7ad4976 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.iotdb.db.protocol.mpprest.handler;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -47,7 +48,7 @@ public class QueryDataSetHandler {
    */
   public static Response fillQueryDataSet(
       IQueryExecution queryExecution, Statement statement, int actualRowSizeLimit)
-      throws IOException {
+      throws IOException, IoTDBException {
     if (statement instanceof ShowStatement) {
       return fillShowPlanDataSet(queryExecution, actualRowSizeLimit);
     } else if (statement instanceof QueryStatement) {
@@ -67,7 +68,8 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillDataSetWithTimestamps(
-      IQueryExecution queryExecution, final int actualRowSizeLimit, final long timePrecision) {
+      IQueryExecution queryExecution, final int actualRowSizeLimit, final long timePrecision)
+      throws IoTDBException {
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
     DatasetHeader header = queryExecution.getDatasetHeader();
@@ -81,7 +83,7 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillAggregationPlanDataSet(
-      IQueryExecution queryExecution, final int actualRowSizeLimit) {
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws IoTDBException {
 
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
@@ -97,7 +99,7 @@ public class QueryDataSetHandler {
   }
 
   private static Response fillShowPlanDataSet(
-      IQueryExecution queryExecution, final int actualRowSizeLimit) throws IOException {
+      IQueryExecution queryExecution, final int actualRowSizeLimit) throws IoTDBException {
     org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet =
         new org.apache.iotdb.db.protocol.rest.model.QueryDataSet();
     initTargetDatasetOrderByOrderWithSourceDataSet(
@@ -135,7 +137,8 @@ public class QueryDataSetHandler {
       IQueryExecution queryExecution,
       int actualRowSizeLimit,
       org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet,
-      final long timePrecision) {
+      final long timePrecision)
+      throws IoTDBException {
     int fetched = 0;
     int columnNum = queryExecution.getOutputValueColumnCount();
     while (fetched < actualRowSizeLimit) {
@@ -177,7 +180,8 @@ public class QueryDataSetHandler {
   private static Response fillQueryDataSetWithoutTimestamps(
       IQueryExecution queryExecution,
       int actualRowSizeLimit,
-      org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) {
+      org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet)
+      throws IoTDBException {
     int fetched = 0;
     int columnNum = queryExecution.getOutputValueColumnCount();
     while (fetched < actualRowSizeLimit) {
@@ -210,7 +214,7 @@ public class QueryDataSetHandler {
   }
 
   public static Response fillGrafanaVariablesResult(
-      IQueryExecution queryExecution, Statement statement) {
+      IQueryExecution queryExecution, Statement statement) throws IoTDBException {
     List<String> results = new ArrayList<>();
     Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
     if (!optionalTsBlock.isPresent()) {
@@ -232,7 +236,8 @@ public class QueryDataSetHandler {
     return Response.ok().entity(results).build();
   }
 
-  public static Response fillGrafanaNodesResult(IQueryExecution queryExecution) throws IOException {
+  public static Response fillGrafanaNodesResult(IQueryExecution queryExecution)
+      throws IoTDBException {
     List<String> nodes = new ArrayList<>();
     Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
     if (!optionalTsBlock.isPresent()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 844ffa66ad..550ef32f60 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -545,7 +545,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       if (s == null) {
         return RpcUtils.getTSExecuteStatementResp(
             RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"));
+                TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
       }
       // permission check
       TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 16b04f4e7b..048d791e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -92,6 +92,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
+import com.google.common.collect.ImmutableList;
 import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -118,26 +119,63 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
     LOGGER.info("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    ConsensusReadResponse readResponse;
+
+    // deserialize ConsensusGroupId
+    ConsensusGroupId groupId;
+    try {
+      groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    } catch (Throwable t) {
+      LOGGER.error("Deserialize ConsensusGroupId failed. ", t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
+      return resp;
+    }
+
     // We deserialize here instead of the underlying state machine because parallelism is possible
     // here but not at the underlying state machine
-    FragmentInstance fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
-    if (groupId instanceof DataRegionId) {
-      readResponse = DataRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
-    } else {
-      readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
+    FragmentInstance fragmentInstance;
+    try {
+      fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+    } catch (Throwable t) {
+      LOGGER.error("Deserialize FragmentInstance failed.", t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Deserialize FragmentInstance failed: " + t.getMessage());
+      return resp;
     }
-    if (!readResponse.isSuccess()) {
+
+    // execute fragment instance in state machine
+    ConsensusReadResponse readResponse;
+    try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) {
+      if (groupId instanceof DataRegionId) {
+        readResponse = DataRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
+      } else {
+        readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
+      }
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
+      if (!readResponse.isSuccess()) {
+        LOGGER.error(
+            "Execute FragmentInstance in ConsensusGroup {} failed.",
+            req.getConsensusGroupId(),
+            readResponse.getException());
+        resp.setAccepted(false);
+        resp.setMessage(
+            "Execute FragmentInstance failed: "
+                + (readResponse.getException() == null
+                    ? ""
+                    : readResponse.getException().getMessage()));
+      } else {
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset();
+        resp.setAccepted(!info.getState().isFailed());
+        resp.setMessage(info.getMessage());
+      }
+      return resp;
+    } catch (Throwable t) {
       LOGGER.error(
-          "execute FragmentInstance in ConsensusGroup {} failed because {}",
-          req.getConsensusGroupId(),
-          readResponse.getException());
-      return new TSendFragmentInstanceResp(false);
+          "Execute FragmentInstance in ConsensusGroup {} failed.", req.getConsensusGroupId(), t);
+      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      return resp;
     }
-    FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset();
-    return new TSendFragmentInstanceResp(!info.getState().isFailed());
   }
 
   @Override
@@ -204,11 +242,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
     FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
-    try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) {
-      FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
-      return info != null
-          ? new TFragmentInstanceStateResp(info.getState().toString())
-          : new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
+    FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
+    if (info != null) {
+      TFragmentInstanceStateResp resp = new TFragmentInstanceStateResp(info.getState().toString());
+      resp.setFailedMessages(ImmutableList.of(info.getMessage()));
+      return resp;
+    } else {
+      return new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 3b8d2bd608..f7e9640455 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
@@ -178,7 +179,7 @@ public class QueryDataSetUtils {
   }
 
   public static TSQueryDataSet convertTsBlockByFetchSize(
-      IQueryExecution queryExecution, int fetchSize) throws IOException {
+      IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException {
     int columnNum = queryExecution.getOutputValueColumnCount();
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
     // one time column and each value column has an actual value buffer and a bitmap value to
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 3ceeefb0c4..b0371aafc6 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -116,6 +116,7 @@ struct TFetchFragmentInstanceStateReq {
 // TODO: need to supply more fields according to implementation
 struct TFragmentInstanceStateResp {
   1: required string state
+  2: optional list<string> failedMessages
 }
 
 struct TCancelQueryReq {


[iotdb] 02/03: Done

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55a032e3edfdef849194eb4f84dd3ee0676595d3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Jul 12 16:36:42 2022 +0800

    Done
---
 .../db/mpp/execution/exchange/ISinkHandle.java     | 10 ++++++
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 40 +++++++++++++++++-----
 .../mpp/execution/exchange/LocalSourceHandle.java  |  2 +-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 18 +++++-----
 .../db/mpp/execution/exchange/SinkHandle.java      | 35 +++++++++++++++----
 .../fragment/FragmentInstanceExecution.java        |  6 +++-
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 20 +++++++++++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  1 -
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  6 ++++
 9 files changed, 111 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index b4be67e164..4450ca8c21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -68,6 +68,16 @@ public interface ISinkHandle {
   /**
    * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
    * the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in abnormal case
    */
   void abort();
+
+  /**
+   * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
+   * complete the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in normal case.
+   */
+  void close();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index 852e79ac8b..d6960e7664 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -45,6 +45,7 @@ public class LocalSinkHandle implements ISinkHandle {
   private final SharedTsBlockQueue queue;
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private boolean aborted = false;
+  private boolean closed = false;
 
   public LocalSinkHandle(
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -72,9 +73,7 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized ListenableFuture<?> isFull() {
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is closed.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -104,9 +103,7 @@ public class LocalSinkHandle implements ISinkHandle {
   public void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
     synchronized (this) {
-      if (aborted) {
-        throw new IllegalStateException("Sink handle is aborted.");
-      }
+      checkState();
       if (!blocked.isDone()) {
         throw new IllegalStateException("Sink handle is blocked.");
       }
@@ -135,7 +132,8 @@ public class LocalSinkHandle implements ISinkHandle {
     synchronized (queue) {
       synchronized (this) {
         logger.info("set noMoreTsBlocks.");
-        if (aborted) {
+        if (aborted || closed) {
+          logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed);
           return;
         }
         queue.setNoMoreTsBlocks(true);
@@ -151,17 +149,33 @@ public class LocalSinkHandle implements ISinkHandle {
     logger.info("Sink handle is being aborted.");
     synchronized (queue) {
       synchronized (this) {
-        if (aborted) {
+        if (aborted || closed) {
           return;
         }
         aborted = true;
-        queue.destroy();
+        queue.abort();
         sinkHandleListener.onAborted(this);
       }
     }
     logger.info("Sink handle is aborted");
   }
 
+  @Override
+  public void close() {
+    logger.info("Sink handle is being closed.");
+    synchronized (queue) {
+      synchronized (this) {
+        if (aborted || closed) {
+          return;
+        }
+        closed = true;
+        queue.close();
+        sinkHandleListener.onFinish(this);
+      }
+    }
+    logger.info("Sink handle is closed");
+  }
+
   public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId;
   }
@@ -173,4 +187,12 @@ public class LocalSinkHandle implements ISinkHandle {
   SharedTsBlockQueue getSharedTsBlockQueue() {
     return queue;
   }
+
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("Sink Handle is closed.");
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 95056cf44b..3bc455592e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -166,7 +166,7 @@ public class LocalSourceHandle implements ISourceHandle {
           if (aborted) {
             return;
           }
-          queue.destroy();
+          queue.close();
           closed = true;
           sourceHandleListener.onFinished(this);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 7cac64a6e3..29cc6fb44b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -53,7 +53,7 @@ public class SharedTsBlockQueue {
 
   private ListenableFuture<Void> blockedOnMemory;
 
-  private boolean destroyed = false;
+  private boolean closed = false;
 
   private LocalSourceHandle sourceHandle;
   private LocalSinkHandle sinkHandle;
@@ -93,7 +93,7 @@ public class SharedTsBlockQueue {
   /** Notify no more tsblocks will be added to the queue. */
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
@@ -110,7 +110,7 @@ public class SharedTsBlockQueue {
    * returned by {@link #isBlocked()} completes.
    */
   public TsBlock remove() {
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
@@ -134,7 +134,7 @@ public class SharedTsBlockQueue {
    * the returned future of last invocation completes.
    */
   public ListenableFuture<Void> add(TsBlock tsBlock) {
-    if (destroyed) {
+    if (closed) {
       throw new IllegalStateException("queue has been destroyed");
     }
 
@@ -153,11 +153,11 @@ public class SharedTsBlockQueue {
   }
 
   /** Destroy the queue and complete the future. Should only be called in normal case */
-  public void destroy() {
-    if (destroyed) {
+  public void close() {
+    if (closed) {
       return;
     }
-    destroyed = true;
+    closed = true;
     if (!blocked.isDone()) {
       blocked.set(null);
     }
@@ -177,10 +177,10 @@ public class SharedTsBlockQueue {
   // instead of blocked.cancel(true);
   /** Destroy the queue and cancel the future. Should only be called in normal case */
   public void abort() {
-    if (destroyed) {
+    if (closed) {
       return;
     }
-    destroyed = true;
+    closed = true;
     if (!blocked.isDone()) {
       blocked.cancel(true);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 3619de44f4..e8247cc621 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -82,6 +82,9 @@ public class SinkHandle implements ISinkHandle {
   private long bufferRetainedSizeInBytes = 0;
 
   private boolean aborted = false;
+
+  private boolean closed = false;
+
   private boolean noMoreTsBlocks = false;
 
   public SinkHandle(
@@ -110,9 +113,7 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public synchronized ListenableFuture<?> isFull() {
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is aborted.");
-    }
+    checkState();
     return nonCancellationPropagating(blocked);
   }
 
@@ -123,9 +124,7 @@ public class SinkHandle implements ISinkHandle {
   @Override
   public synchronized void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (aborted) {
-      throw new IllegalStateException("Sink handle is aborted.");
-    }
+    checkState();
     if (!blocked.isDone()) {
       throw new IllegalStateException("Sink handle is blocked.");
     }
@@ -223,6 +222,22 @@ public class SinkHandle implements ISinkHandle {
     logger.info("SinkHandle is aborted");
   }
 
+  @Override
+  public void close() {
+    logger.info("SinkHandle is being closed.");
+    sequenceIdToTsBlock.clear();
+    closed = true;
+    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+    if (bufferRetainedSizeInBytes > 0) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+    sinkHandleListener.onFinish(this);
+    logger.info("SinkHandle is closed");
+  }
+
   @Override
   public boolean isAborted() {
     return aborted;
@@ -306,6 +321,14 @@ public class SinkHandle implements ISinkHandle {
         localFragmentInstanceId.instanceId);
   }
 
+  private void checkState() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
+    } else if (closed) {
+      throw new IllegalStateException("SinkHandle is closed.");
+    }
+  }
+
   @TestOnly
   public void setRetryIntervalInMs(long retryIntervalInMs) {
     this.retryIntervalInMs = retryIntervalInMs;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index ac70a848dd..c1fd9568a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -114,7 +114,11 @@ public class FragmentInstanceExecution {
             driver.close();
             // help for gc
             driver = null;
-            sinkHandle.abort();
+            if (newState.isFailed()) {
+              sinkHandle.abort();
+            } else {
+              sinkHandle.close();
+            }
             // help for gc
             sinkHandle = null;
             if (newState.isFailed()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 2a67c35049..e3f4d0fc66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -153,6 +153,26 @@ public class MemoryPool {
     return ((MemoryReservationFuture<Void>) future).getBytes();
   }
 
+  /**
+   * Complete the specified memory reservation. If the reservation has finished, do nothing.
+   *
+   * @param future The future returned from {@link #reserve(String, long)}
+   * @return If the future has not complete, return the number of bytes being reserved. Otherwise,
+   *     return 0.
+   */
+  public synchronized long tryComplete(ListenableFuture<Void> future) {
+    Validate.notNull(future);
+    // If the future is not a MemoryReservationFuture, it must have been completed.
+    if (future.isDone()) {
+      return 0L;
+    }
+    Validate.isTrue(
+        future instanceof MemoryReservationFuture,
+        "invalid future type " + future.getClass().getSimpleName());
+    ((MemoryReservationFuture<Void>) future).set(null);
+    return ((MemoryReservationFuture<Void>) future).getBytes();
+  }
+
   public synchronized void free(String queryId, long bytes) {
     Validate.notNull(queryId);
     Validate.isTrue(bytes > 0L);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 73730864d2..9563357d12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -99,7 +99,6 @@ public class ClusterScheduler implements IScheduler {
     try {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
-        logger.error("dispatch failed.");
         if (result.getFailureStatus() != null) {
           stateMachine.transitionToFailed(result.getFailureStatus());
         } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 21a4436a6b..4a7c403791 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -159,6 +159,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
+            logger.error(sendFragmentInstanceResp.message);
             throw new FragmentInstanceDispatchException(
                 RpcUtils.getStatus(
                     TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message));
@@ -208,6 +209,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
           } else {
             readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance);
           }
+          if (readResponse == null) {
+            logger.error("ReadResponse is null");
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "ReadResponse is null"));
+          }
         } catch (Throwable t) {
           logger.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, t);
           throw new FragmentInstanceDispatchException(


[iotdb] 03/03: Fix compiler error

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch QueryException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8322e9e83ddc4db1122caf4d956790b3befb1fd4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Jul 12 19:08:43 2022 +0800

    Fix compiler error
---
 .../org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 5e29f47066..d68ad6726a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -93,6 +93,12 @@ public class StubSinkHandle implements ISinkHandle {
     tsBlocks.clear();
   }
 
+  @Override
+  public void close() {
+    closed = true;
+    tsBlocks.clear();
+  }
+
   public List<TsBlock> getTsBlocks() {
     return tsBlocks;
   }