You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/19 06:05:40 UTC

[iotdb] branch master updated: [IOTDB-5689] Close Isink when ISourceHandle is closed

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 487dd31417 [IOTDB-5689] Close Isink when ISourceHandle is closed
487dd31417 is described below

commit 487dd31417abf0a04864b1a812e191784a7d2e98
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Mar 19 14:05:34 2023 +0800

    [IOTDB-5689] Close Isink when ISourceHandle is closed
---
 .../iotdb/db/mpp/execution/driver/Driver.java      |  13 +-
 .../execution/exchange/MPPDataExchangeManager.java |  67 ++-
 .../mpp/execution/exchange/SharedTsBlockQueue.java |   6 +-
 .../db/mpp/execution/exchange/sink/ISink.java      |   4 +
 .../mpp/execution/exchange/sink/ISinkHandle.java   |   3 +
 .../execution/exchange/sink/LocalSinkChannel.java  |   7 +
 .../execution/exchange/sink/ShuffleSinkHandle.java |  24 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |  11 +-
 .../execution/exchange/source/SourceHandle.java    |  59 ++-
 .../iotdb/db/mpp/execution/operator/Operator.java  |  10 +-
 .../process/AbstractConsumeAllOperator.java        |   4 +-
 .../operator/process/AbstractIntoOperator.java     |   7 +-
 .../operator/process/AggregationOperator.java      |   6 +-
 .../operator/process/DeviceMergeOperator.java      |   6 +-
 .../operator/process/DeviceViewOperator.java       |  17 +-
 .../execution/operator/process/FillOperator.java   |   6 +-
 .../operator/process/FilterAndProjectOperator.java |   6 +-
 .../execution/operator/process/LimitOperator.java  |   6 +-
 .../operator/process/LinearFillOperator.java       |   8 +-
 .../operator/process/MergeSortOperator.java        |   8 +-
 .../execution/operator/process/OffsetOperator.java |   6 +-
 .../process/RawDataAggregationOperator.java        |   9 +-
 .../operator/process/SingleDeviceViewOperator.java |   6 +-
 .../process/SingleInputAggregationOperator.java    |   6 +-
 .../process/SlidingWindowAggregationOperator.java  |   4 +-
 .../execution/operator/process/SortOperator.java   |   6 +-
 .../operator/process/TagAggregationOperator.java   |   8 +-
 .../operator/process/TransformOperator.java        |  16 +-
 .../process/join/HorizontallyConcatOperator.java   |   8 +-
 .../process/join/RowBasedTimeJoinOperator.java     |  11 +-
 .../operator/process/join/TimeJoinOperator.java    |   9 +-
 .../last/AbstractUpdateLastCacheOperator.java      |   4 +-
 .../last/AlignedUpdateLastCacheOperator.java       |   2 +-
 .../process/last/LastQueryCollectOperator.java     |   6 +-
 .../process/last/LastQueryMergeOperator.java       |  12 +-
 .../operator/process/last/LastQueryOperator.java   |   7 +-
 .../process/last/LastQuerySortOperator.java        |   7 +-
 .../process/last/UpdateLastCacheOperator.java      |   2 +-
 .../schema/CountGroupByLevelMergeOperator.java     |   6 +-
 .../schema/CountGroupByLevelScanOperator.java      |   6 +-
 .../operator/schema/CountMergeOperator.java        |   7 +-
 .../schema/NodeManageMemoryMergeOperator.java      |   6 +-
 .../operator/schema/NodePathsConvertOperator.java  |   6 +-
 .../operator/schema/NodePathsCountOperator.java    |   6 +-
 .../operator/schema/SchemaCountOperator.java       |   6 +-
 .../operator/schema/SchemaFetchMergeOperator.java  |   6 +-
 .../operator/schema/SchemaFetchScanOperator.java   |   6 +-
 .../operator/schema/SchemaQueryMergeOperator.java  |   6 +-
 .../schema/SchemaQueryOrderByHeatOperator.java     |   6 +-
 .../operator/schema/SchemaQueryScanOperator.java   |   6 +-
 .../operator/sink/IdentitySinkOperator.java        |  34 +-
 .../operator/sink/ShuffleHelperOperator.java       |  39 +-
 .../AbstractSeriesAggregationScanOperator.java     |   6 +-
 .../operator/source/AlignedSeriesScanOperator.java |   6 +-
 .../operator/source/ExchangeOperator.java          |   6 +-
 .../operator/source/LastCacheScanOperator.java     |   6 +-
 .../operator/source/SeriesScanOperator.java        |   6 +-
 .../operator/source/ShowQueriesOperator.java       |   6 +-
 .../db/mpp/transformation/api/YieldableReader.java |   6 +-
 .../transformation/dag/input/IUDFInputDataSet.java |   2 +-
 .../dag/input/QueryDataSetInputLayer.java          |   4 +-
 .../dag/input/TsBlockInputDataSet.java             |   2 +-
 .../MultiInputColumnIntermediateLayer.java         |  10 +-
 ...InputColumnMultiReferenceIntermediateLayer.java |  12 +-
 ...nputColumnSingleReferenceIntermediateLayer.java |  10 +-
 .../dag/transformer/Transformer.java               |   4 +-
 .../dag/transformer/binary/BinaryTransformer.java  |   4 +-
 .../transformer/binary/LogicBinaryTransformer.java |   2 +-
 .../multi/MappableUDFQueryRowTransformer.java      |   2 +-
 .../transformer/multi/UDFQueryRowTransformer.java  |   2 +-
 .../multi/UDFQueryRowWindowTransformer.java        |   2 +-
 .../multi/UniversalUDFQueryTransformer.java        |   4 +-
 .../transformer/ternary/TernaryTransformer.java    |   4 +-
 .../dag/transformer/unary/IsNullTransformer.java   |   2 +-
 .../dag/transformer/unary/UnaryTransformer.java    |   2 +-
 .../unary/scalar/DiffFunctionTransformer.java      |   2 +-
 .../transformation/dag/util/LayerCacheUtils.java   |  10 +-
 .../iotdb/db/mpp/execution/exchange/StubSink.java  |   5 +
 .../operator/AggregationOperatorTest.java          |  26 +-
 .../AlignedSeriesAggregationScanOperatorTest.java  |  38 +-
 .../operator/AlignedSeriesScanOperatorTest.java    |   6 +-
 .../operator/DeviceMergeOperatorTest.java          | 565 ---------------------
 .../execution/operator/DeviceViewOperatorTest.java |   3 +-
 .../mpp/execution/operator/FillOperatorTest.java   |  16 +-
 .../operator/HorizontallyConcatOperatorTest.java   |   2 +-
 .../operator/LastQueryMergeOperatorTest.java       |  38 +-
 .../execution/operator/LastQueryOperatorTest.java  |   4 +-
 .../operator/LastQuerySortOperatorTest.java        |   5 +-
 .../mpp/execution/operator/LimitOperatorTest.java  |   2 +-
 .../execution/operator/LinearFillOperatorTest.java |  56 +-
 .../execution/operator/MergeSortOperatorTest.java  |  38 +-
 .../mpp/execution/operator/OffsetOperatorTest.java |   6 +-
 .../operator/RawDataAggregationOperatorTest.java   |  54 +-
 .../SeriesAggregationScanOperatorTest.java         |  65 ++-
 .../execution/operator/SeriesScanOperatorTest.java |   2 +-
 .../operator/SingleDeviceViewOperatorTest.java     |   3 +-
 .../SlidingWindowAggregationOperatorTest.java      |   3 +-
 .../execution/operator/TimeJoinOperatorTest.java   |   6 +-
 .../operator/UpdateLastCacheOperatorTest.java      |   6 +-
 .../operator/schema/SchemaCountOperatorTest.java   |   4 +-
 .../schema/SchemaQueryScanOperatorTest.java        |   4 +-
 thrift/src/main/thrift/datanode.thrift             |  12 +-
 102 files changed, 674 insertions(+), 970 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index 2fd9a56911..c3be5e1bbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -190,7 +190,16 @@ public abstract class Driver implements IDriver {
   private boolean isFinishedInternal() {
     checkLockHeld("Lock must be held to call isFinishedInternal");
 
-    boolean finished = state.get() != State.ALIVE || driverContext.isDone() || root.isFinished();
+    boolean finished;
+    try {
+      finished =
+          state.get() != State.ALIVE
+              || driverContext.isDone()
+              || root.isFinished()
+              || sink.isClosed();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
     if (finished) {
       state.compareAndSet(State.ALIVE, State.NEED_DESTRUCTION);
     }
@@ -219,7 +228,7 @@ public abstract class Driver implements IDriver {
       List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
       if (interrupterStack == null) {
         driverContext.failed(t);
-        throw t;
+        throw new RuntimeException(t);
       }
 
       // Driver thread was interrupted which should only happen if the task is already finished.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..501781b408 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -96,22 +98,16 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             "[ProcessGetTsBlockRequest] sequence ID in [{}, {})",
             req.getStartSequenceId(),
             req.getEndSequenceId());
-        if (!shuffleSinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
-          throw new TException(
-              "Source fragment instance not found. Fragment instance ID: "
-                  + req.getSourceFragmentInstanceId()
-                  + ".");
+        TGetDataBlockResponse resp = new TGetDataBlockResponse(new ArrayList<>());
+        ISinkHandle sinkHandle = shuffleSinkHandles.get(req.getSourceFragmentInstanceId());
+        if (sinkHandle == null) {
+          return resp;
         }
-        TGetDataBlockResponse resp = new TGetDataBlockResponse();
         // index of the channel must be a SinkChannel
-        SinkChannel sinkChannelHandle =
-            (SinkChannel)
-                (shuffleSinkHandles
-                    .get(req.getSourceFragmentInstanceId())
-                    .getChannel(req.getIndex()));
+        SinkChannel sinkChannel = (SinkChannel) (sinkHandle.getChannel(req.getIndex()));
         for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); i++) {
           try {
-            ByteBuffer serializedTsBlock = sinkChannelHandle.getSerializedTsBlock(i);
+            ByteBuffer serializedTsBlock = sinkChannel.getSerializedTsBlock(i);
             resp.addToTsBlocks(serializedTsBlock);
           } catch (IllegalStateException | IOException e) {
             throw new TException(e);
@@ -140,15 +136,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             e.getStartSequenceId(),
             e.getEndSequenceId(),
             e.getSourceFragmentInstanceId());
-        if (!shuffleSinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
+        ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+        if (sinkHandle == null) {
           LOGGER.debug(
               "received ACK event but target FragmentInstance[{}] is not found.",
               e.getSourceFragmentInstanceId());
           return;
         }
         // index of the channel must be a SinkChannel
-        ((SinkChannel)
-                (shuffleSinkHandles.get(e.getSourceFragmentInstanceId()).getChannel(e.getIndex())))
+        ((SinkChannel) (sinkHandle.getChannel(e.getIndex())))
             .acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
       } catch (Throwable t) {
         LOGGER.warn(
@@ -162,6 +158,36 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       }
     }
 
+    @Override
+    public void onCloseSinkChannelEvent(TCloseSinkChannelEvent e) throws TException {
+      try (SetThreadName fragmentInstanceName =
+          new SetThreadName(
+              createFullId(
+                  e.sourceFragmentInstanceId.queryId,
+                  e.sourceFragmentInstanceId.fragmentId,
+                  e.sourceFragmentInstanceId.instanceId))) {
+        LOGGER.debug(
+            "Closed source handle of ShuffleSinkHandle {}, channel index: {}.",
+            e.getSourceFragmentInstanceId(),
+            e.getIndex());
+        ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+        if (sinkHandle == null) {
+          LOGGER.debug(
+              "received CloseSinkChannelEvent but target FragmentInstance[{}] is not found.",
+              e.getSourceFragmentInstanceId());
+          return;
+        }
+        sinkHandle.getChannel(e.getIndex()).close();
+      } catch (Throwable t) {
+        LOGGER.warn(
+            "Close channel of ShuffleSinkHandle {}, index {} failed.",
+            e.getSourceFragmentInstanceId(),
+            e.getIndex(),
+            t);
+        throw t;
+      }
+    }
+
     @Override
     public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
       long startTime = System.nanoTime();
@@ -465,9 +491,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         sourceHandleMap == null ? null : (LocalSourceHandle) sourceHandleMap.get(remotePlanNodeId);
     if (localSourceHandle != null) {
       LOGGER.debug("Get SharedTsBlockQueue from local source handle");
-      queue =
-          ((LocalSourceHandle) sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId))
-              .getSharedTsBlockQueue();
+      queue = localSourceHandle.getSharedTsBlockQueue();
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
       queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, localMemoryManager);
@@ -623,11 +647,10 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         localPlanNodeId,
         localFragmentInstanceId);
     SharedTsBlockQueue queue;
-    if (shuffleSinkHandles.containsKey(remoteFragmentInstanceId)) {
+    ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId);
+    if (sinkHandle != null) {
       LOGGER.debug("Get SharedTsBlockQueue from local sink handle");
-      queue =
-          ((LocalSinkChannel) shuffleSinkHandles.get(remoteFragmentInstanceId).getChannel(index))
-              .getSharedTsBlockQueue();
+      queue = ((LocalSinkChannel) (sinkHandle.getChannel(index))).getSharedTsBlockQueue();
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
       queue =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b62fffc8c6..53d668cc86 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -131,6 +131,10 @@ public class SharedTsBlockQueue {
     return queue.isEmpty();
   }
 
+  public boolean isClosed() {
+    return closed;
+  }
+
   public int getNumOfBufferedTsBlocks() {
     return queue.size();
   }
@@ -147,7 +151,7 @@ public class SharedTsBlockQueue {
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     LOGGER.debug("[SignalNoMoreTsBlockOnQueue]");
     if (closed) {
-      LOGGER.warn("queue has been destroyed");
+      LOGGER.debug("The queue has been destroyed when calling setNoMoreTsBlocks.");
       return;
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
index 5af20abf60..961f49fb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISink.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.exchange.sink;
 
+import org.apache.iotdb.db.mpp.execution.driver.Driver;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
@@ -78,6 +79,9 @@ public interface ISink {
    */
   void close();
 
+  /** Return true if this ISink has been closed. Used in {@link Driver#isFinishedInternal()} */
+  boolean isClosed();
+
   /** Set max bytes this ISink can reserve from memory pool */
   void setMaxBytesCanReserve(long maxBytesCanReserve);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index c600f60eb6..5fb511e402 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -33,4 +33,7 @@ public interface ISinkHandle extends ISink {
 
   /** Open specified channel of ISinkHandle. */
   void tryOpenChannel(int channelIndex);
+
+  /** Return true if the specified channel is closed. */
+  boolean isChannelClosed(int index);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index 29ff8b7a12..afd043c9af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -225,5 +225,12 @@ public class LocalSinkChannel implements ISinkChannel {
     }
   }
 
+  @Override
+  public boolean isClosed() {
+    synchronized (queue) {
+      return queue.isClosed();
+    }
+  }
+
   // end region
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 12a438d845..7060541acc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -26,12 +26,14 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
 
@@ -46,6 +48,8 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   private final boolean[] channelOpened;
 
+  private final Set<Integer> closedChannel = Sets.newConcurrentHashSet();
+
   private final DownStreamChannelIndex downStreamChannelIndex;
 
   private final int channelNum;
@@ -141,6 +145,11 @@ public class ShuffleSinkHandle implements ISinkHandle {
     }
   }
 
+  @Override
+  public boolean isClosed() {
+    return closedChannel.size() == downStreamChannelList.size();
+  }
+
   @Override
   public synchronized boolean isAborted() {
     return aborted;
@@ -235,6 +244,19 @@ public class ShuffleSinkHandle implements ISinkHandle {
     }
   }
 
+  @Override
+  public boolean isChannelClosed(int index) {
+    if (closedChannel.contains(index)) {
+      return true;
+    } else {
+      if (downStreamChannelList.get(index).isClosed()) {
+        closedChannel.add(index);
+        return true;
+      }
+      return false;
+    }
+  }
+
   // region ============ Shuffle Related ============
   public enum ShuffleStrategyEnum {
     PLAIN,
@@ -281,7 +303,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
     private boolean satisfy(int channelIndex) {
       // downStreamChannel is always an ISinkChannel
       ISinkChannel channel = downStreamChannelList.get(channelIndex);
-      if (channel.isNoMoreTsBlocks()) {
+      if (channel.isNoMoreTsBlocks() || channel.isClosed()) {
         return false;
       }
       return channel.getBufferRetainedSizeInBytes() <= channelMemoryThreshold
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index fbcdaf2e0c..5cd28de462 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -157,6 +157,10 @@ public class SinkChannel implements ISinkChannel {
     long startTime = System.nanoTime();
     try {
       Validate.notNull(tsBlock, "tsBlocks is null");
+      if (closed) {
+        // SinkChannel may have been closed by its downstream SourceHandle
+        return;
+      }
       checkState();
       if (!blocked.isDone()) {
         throw new IllegalStateException("Sink handle is blocked.");
@@ -254,6 +258,11 @@ public class SinkChannel implements ISinkChannel {
     LOGGER.debug("[EndCloseSinkChannel]");
   }
 
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+
   @Override
   public synchronized boolean isAborted() {
     return aborted;
@@ -353,8 +362,6 @@ public class SinkChannel implements ISinkChannel {
   private void checkState() {
     if (aborted) {
       throw new IllegalStateException("SinkChannel is aborted.");
-    } else if (closed) {
-      throw new IllegalStateException("SinkChannel is closed.");
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index db462721d0..b66dd4bce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
@@ -373,6 +374,7 @@ public class SourceHandle implements ISourceHandle {
           .clearMemoryReservationMap(
               localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
+      executorService.submit(new SendCloseSinkChannelEventTask());
       currSequenceId = lastSequenceId + 1;
       sourceHandleListener.onFinished(this);
     }
@@ -490,8 +492,20 @@ public class SourceHandle implements ISourceHandle {
           try (SyncDataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             TGetDataBlockResponse resp = client.getDataBlock(req);
-
             int tsBlockNum = resp.getTsBlocks().size();
+            if (tsBlockNum == 0) {
+              if (!closed) {
+                // failed to pull TsBlocks
+                LOGGER.warn(
+                    "{} failed to pull TsBlocks [{}] to [{}] from SinkHandle {}, channel index {},",
+                    localFragmentInstanceId,
+                    startSequenceId,
+                    endSequenceId,
+                    remoteFragmentInstanceId,
+                    indexOfUpstreamSinkHandle);
+              }
+              return;
+            }
             List<ByteBuffer> tsBlocks = new ArrayList<>(tsBlockNum);
             tsBlocks.addAll(resp.getTsBlocks());
 
@@ -620,4 +634,47 @@ public class SourceHandle implements ISourceHandle {
       }
     }
   }
+
+  class SendCloseSinkChannelEventTask implements Runnable {
+
+    @Override
+    public void run() {
+      try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+        LOGGER.debug(
+            "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+            remoteFragmentInstanceId,
+            indexOfUpstreamSinkHandle);
+        int attempt = 0;
+        TCloseSinkChannelEvent closeSinkChannelEvent =
+            new TCloseSinkChannelEvent(remoteFragmentInstanceId, indexOfUpstreamSinkHandle);
+        while (attempt < MAX_ATTEMPT_TIMES) {
+          attempt += 1;
+          long startTime = System.nanoTime();
+          try (SyncDataNodeMPPDataExchangeServiceClient client =
+              mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+            client.onCloseSinkChannelEvent(closeSinkChannelEvent);
+            break;
+          } catch (Throwable e) {
+            LOGGER.warn(
+                "[SendCloseSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+                remoteFragmentInstanceId,
+                indexOfUpstreamSinkHandle);
+            if (attempt == MAX_ATTEMPT_TIMES) {
+              synchronized (SourceHandle.this) {
+                sourceHandleListener.onFailure(SourceHandle.this, e);
+              }
+            }
+            try {
+              Thread.sleep(retryIntervalInMs);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+              synchronized (SourceHandle.this) {
+                sourceHandleListener.onFailure(SourceHandle.this, e);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index aaf404d467..d762220da3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -38,7 +38,7 @@ public interface Operator extends AutoCloseable {
     return NOT_BLOCKED;
   }
 
-  default TsBlock nextWithTimer() {
+  default TsBlock nextWithTimer() throws Exception {
     OperatorContext context = getOperatorContext();
     long startTime = System.nanoTime();
 
@@ -51,9 +51,9 @@ public interface Operator extends AutoCloseable {
   }
 
   /** Gets next tsBlock from this operator. If no data is currently available, return null. */
-  TsBlock next();
+  TsBlock next() throws Exception;
 
-  default boolean hasNextWithTimer() {
+  default boolean hasNextWithTimer() throws Exception {
     OperatorContext context = getOperatorContext();
     long startTime = System.nanoTime();
 
@@ -65,7 +65,7 @@ public interface Operator extends AutoCloseable {
   }
 
   /** @return true if the operator has more data, otherwise false */
-  boolean hasNext();
+  boolean hasNext() throws Exception;
 
   /** This method will always be called before releasing the Operator reference. */
   @Override
@@ -74,7 +74,7 @@ public interface Operator extends AutoCloseable {
   /**
    * Is this operator completely finished processing and no more output TsBlock will be produced.
    */
-  boolean isFinished();
+  boolean isFinished() throws Exception;
 
   /**
    * We should also consider the memory used by its children operator, so the calculation logic may
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index 1ae0fbf801..7c33664327 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -82,7 +82,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
    * @return true if results of all children are ready. Return false if some children is blocked or
    *     return null.
    */
-  protected boolean prepareInput() {
+  protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!isEmpty(i)) {
@@ -120,7 +120,7 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator
     }
   }
 
-  protected TsBlock getNextTsBlock(int childIndex) {
+  protected TsBlock getNextTsBlock(int childIndex) throws Exception {
     return children.get(childIndex).nextWithTimer();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 0b5c189c76..3d65c1659c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -131,19 +131,18 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !finished;
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     checkLastWriteOperation();
 
     if (!processTsBlock(cachedTsBlock)) {
       return null;
     }
     cachedTsBlock = null;
-
     if (child.hasNextWithTimer()) {
       TsBlock inputTsBlock = child.nextWithTimer();
       processTsBlock(inputTsBlock);
@@ -293,7 +292,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return finished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 364086c4fb..0102c0be07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -94,12 +94,12 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // start stopwatch
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
@@ -135,7 +135,7 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !this.hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 888db23336..cb7630dea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -113,7 +113,7 @@ public class DeviceMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // get new input TsBlock
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNextWithTimer()) {
@@ -197,7 +197,7 @@ public class DeviceMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -224,7 +224,7 @@ public class DeviceMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 242586a78e..c0af80f7fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
 
 import java.util.List;
 
@@ -107,11 +108,16 @@ public class DeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!getCurDeviceOperator().hasNextWithTimer()) {
+      // close finished child
+      getCurDeviceOperator().close();
+      deviceOperators.set(deviceIndex, null);
+      // increment index, move to next child
       deviceIndex++;
       return null;
     }
+
     TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
     if (tsBlock == null) {
       return null;
@@ -138,19 +144,20 @@ public class DeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return deviceIndex < devices.size();
   }
 
   @Override
   public void close() throws Exception {
-    for (Operator child : deviceOperators) {
-      child.close();
+    for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
+      Validate.notNull(deviceOperators.get(i));
+      deviceOperators.get(i).close();
     }
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !this.hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 92ec771f32..c88ab34483 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -57,7 +57,7 @@ public class FillOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
@@ -78,7 +78,7 @@ public class FillOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return child.hasNextWithTimer();
   }
 
@@ -88,7 +88,7 @@ public class FillOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index c08fcfd27d..0016c7255c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -94,7 +94,7 @@ public class FilterAndProjectOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock input = inputOperator.nextWithTimer();
     if (input == null) {
       return null;
@@ -192,12 +192,12 @@ public class FilterAndProjectOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return inputOperator.hasNextWithTimer();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return inputOperator.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 68604cd1ce..b208e551b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -51,7 +51,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
@@ -67,7 +67,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return remainingLimit > 0 && child.hasNextWithTimer();
   }
 
@@ -77,7 +77,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return remainingLimit == 0 || child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 3792829196..2d785311a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -82,7 +82,7 @@ public class LinearFillOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
 
     // make sure we call child.next() at most once
     if (cachedTsBlock.isEmpty()) {
@@ -142,7 +142,7 @@ public class LinearFillOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     // if child.hasNext() return false, it means that there is no more tsBlocks
     noMoreTsBlock = !child.hasNextWithTimer();
     // if there is more tsBlock, we can call child.next() once
@@ -156,7 +156,7 @@ public class LinearFillOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return cachedTsBlock.isEmpty() && child.isFinished();
   }
 
@@ -208,7 +208,7 @@ public class LinearFillOperator implements ProcessOperator {
   /**
    * @return true if we succeed to get next TsBlock and add it into cachedTsBlock, otherwise false
    */
-  private boolean tryToGetNextTsBlock() {
+  private boolean tryToGetNextTsBlock() throws Exception {
     if (canCallNext) { // if we can call child.next(), we call that and cache it in
       // cachedTsBlock
       canCallNext = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index 58b5af4a60..0fdc3f2630 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -83,7 +83,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // start stopwatch
     long startTime = System.nanoTime();
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
@@ -144,7 +144,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -164,7 +164,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
@@ -217,7 +217,7 @@ public class MergeSortOperator extends AbstractConsumeAllOperator {
    *     some children is blocked or return null.
    */
   @Override
-  protected boolean prepareInput() {
+  protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (noMoreTsBlocks[i] || !isEmpty(i)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index e6f146eac1..37127e82aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -51,7 +51,7 @@ public class OffsetOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock block = child.nextWithTimer();
     if (block == null) {
       return null;
@@ -66,7 +66,7 @@ public class OffsetOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return child.hasNextWithTimer();
   }
 
@@ -76,7 +76,7 @@ public class OffsetOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 97e6f6d93c..ed765d9b47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -73,19 +73,19 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
     this.resultTsBlockBuilder = windowManager.createResultTsBlockBuilder(aggregators);
   }
 
-  private boolean hasMoreData() {
+  private boolean hasMoreData() throws Exception {
     return !(inputTsBlock == null || inputTsBlock.isEmpty())
         || child.hasNextWithTimer()
         || hasCachedDataInAggregator;
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return windowManager.hasNext(hasMoreData());
   }
 
   @Override
-  protected boolean calculateNextAggregationResult() {
+  protected boolean calculateNextAggregationResult() throws Exception {
 
     // if needSkip is true, just get the tsBlock directly.
     while (needSkip || !calculateFromRawData()) {
@@ -107,7 +107,8 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
         if (windowManager.notInitializedLastTimeWindow()) {
           initWindowAndAggregators();
         }
-        // If the window is not initialized, it just returns to avoid invoking updateResultTsBlock()
+        // If the window is not initialized, it just returns to avoid invoking
+        // updateResultTsBlock()
         // but if it's skipping the last window, just break and keep skipping.
         if (needSkip || windowManager.isCurWindowInit()) break;
         return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
index a22d52c9e2..51464b742c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -81,7 +81,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock tsBlock = deviceOperator.nextWithTimer();
     if (tsBlock == null) {
       return null;
@@ -103,7 +103,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return deviceOperator.hasNextWithTimer();
   }
 
@@ -113,7 +113,7 @@ public class SingleDeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !this.hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 0fb561e1e3..40938e335e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -72,7 +72,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // start stopwatch
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
@@ -97,7 +97,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !this.hasNextWithTimer();
   }
 
@@ -106,7 +106,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     child.close();
   }
 
-  protected abstract boolean calculateNextAggregationResult();
+  protected abstract boolean calculateNextAggregationResult() throws Exception;
 
   protected abstract void updateResultTsBlock();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 3fb0dca85a..3e8871da00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -71,12 +71,12 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
   }
 
   @Override
-  protected boolean calculateNextAggregationResult() {
+  protected boolean calculateNextAggregationResult() throws Exception {
     if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
       // move to next time window
       curTimeRange = timeRangeIterator.nextTimeRange();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 16c190a25e..ae61601063 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -64,7 +64,7 @@ public class SortOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock tsBlock = inputOperator.nextWithTimer();
     if (tsBlock == null) {
       return null;
@@ -103,7 +103,7 @@ public class SortOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return inputOperator.hasNextWithTimer();
   }
 
@@ -113,7 +113,7 @@ public class SortOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return cachedData == null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 9cc3e37a63..0360028c2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -81,7 +81,7 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
     while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull()) {
@@ -145,12 +145,12 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !isEmpty(readyChildIndex) || children.get(readyChildIndex).hasNextWithTimer();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !this.hasNextWithTimer();
   }
 
@@ -176,7 +176,7 @@ public class TagAggregationOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  protected TsBlock getNextTsBlock(int childIndex) {
+  protected TsBlock getNextTsBlock(int childIndex) throws Exception {
     consumedIndices[childIndex] = 0;
     return children.get(childIndex).nextWithTimer();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 9f0a0113f5..751c121911 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -140,8 +140,7 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  protected YieldableState iterateAllColumnsToNextValid()
-      throws QueryProcessException, IOException {
+  protected YieldableState iterateAllColumnsToNextValid() throws Exception {
     for (int i = 0, n = shouldIterateReadersToNextValid.length; i < n; ++i) {
       if (shouldIterateReadersToNextValid[i]) {
         final YieldableState yieldableState = iterateReaderToNextValid(transformers[i]);
@@ -154,8 +153,7 @@ public class TransformOperator implements ProcessOperator {
     return YieldableState.YIELDABLE;
   }
 
-  protected YieldableState iterateReaderToNextValid(LayerPointReader reader)
-      throws QueryProcessException, IOException {
+  protected YieldableState iterateReaderToNextValid(LayerPointReader reader) throws Exception {
     // Since a constant operand is not allowed to be a result column, the reader will not be
     // a ConstantLayerPointReader.
     // If keepNull is false, we must iterate the reader until a non-null row is returned.
@@ -172,7 +170,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public final boolean hasNext() {
+  public final boolean hasNext() throws Exception {
     if (!timeHeap.isEmpty()) {
       return true;
     }
@@ -188,7 +186,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
 
     try {
       YieldableState yieldableState = iterateAllColumnsToNextValid();
@@ -255,7 +253,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   protected boolean collectReaderAppendIsNull(LayerPointReader reader, long currentTime)
-      throws QueryProcessException, IOException {
+      throws Exception {
     final YieldableState yieldableState = reader.yield();
 
     if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
@@ -275,7 +273,7 @@ public class TransformOperator implements ProcessOperator {
 
   protected YieldableState collectDataPoint(
       LayerPointReader reader, ColumnBuilder writer, long currentTime, int readerIndex)
-      throws QueryProcessException, IOException {
+      throws Exception {
     final YieldableState yieldableState = reader.yield();
     if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
       writer.appendNull();
@@ -336,7 +334,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     // call hasNext first, or data of inputOperator could be missing
     boolean flag = !hasNextWithTimer();
     return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index dadd3760af..f26fb7dfb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -63,7 +63,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!prepareInput()) {
       return null;
     }
@@ -106,7 +106,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -121,7 +121,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
@@ -173,7 +173,7 @@ public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  protected TsBlock getNextTsBlock(int childIndex) {
+  protected TsBlock getNextTsBlock(int childIndex) throws Exception {
     inputIndex[childIndex] = 0;
     return children.get(childIndex).nextWithTimer();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index aca522cc57..0cc36c5e51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -120,7 +120,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (retainedTsBlock != null) {
       return getResultFromRetainedTsBlock();
     }
@@ -184,7 +184,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -207,7 +207,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
@@ -269,7 +269,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
    *     some children is blocked or return null.
    */
   @Override
-  protected boolean prepareInput() {
+  protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (noMoreTsBlocks[i] || !isEmpty(i)) {
@@ -288,6 +288,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
           noMoreTsBlocks[i] = true;
           inputTsBlocks[i] = null;
         }
+
       } else {
         allReady = false;
       }
@@ -301,7 +302,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
   }
 
   @Override
-  protected TsBlock getNextTsBlock(int childIndex) {
+  protected TsBlock getNextTsBlock(int childIndex) throws Exception {
     inputIndex[childIndex] = 0;
     return children.get(childIndex).nextWithTimer();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index f945b700e4..2b60ad6d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -119,7 +119,7 @@ public class TimeJoinOperator extends AbstractOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (retainedTsBlock != null) {
       return getResultFromRetainedTsBlock();
     }
@@ -147,7 +147,8 @@ public class TimeJoinOperator extends AbstractOperator {
             // In such case, TimeJoinOperator can't go on calculating, so we just return null.
             // We can also use the while loop here to continuously call the hasNext() and next()
             // methods of the child operator until its hasNext() returns false or the next() gets
-            // the data that is not empty, but this will cause the execution time of the while loop
+            // the data that is not empty, but this will cause the execution time of the while
+            // loop
             // to be uncontrollable and may exceed all allocated time slice
             return null;
           }
@@ -198,7 +199,7 @@ public class TimeJoinOperator extends AbstractOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -228,7 +229,7 @@ public class TimeJoinOperator extends AbstractOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index 60095e9365..f0b1ffc49a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -81,12 +81,12 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return child.hasNextWithTimer();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index f78c19e4fe..a75581eb03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -50,7 +50,7 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock res = child.nextWithTimer();
     if (res == null) {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index b69ecfa388..199d12b467 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -60,7 +60,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (children.get(currentIndex).hasNextWithTimer()) {
       return children.get(currentIndex).nextWithTimer();
     } else {
@@ -70,7 +70,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return currentIndex < inputOperatorsCount;
   }
 
@@ -82,7 +82,7 @@ public class LastQueryCollectOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index fcc074b024..af4517d246 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -104,7 +104,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
 
     // end time series for returned TsBlock this time, it's the min/max end time series among all
     // the children
@@ -133,10 +133,12 @@ public class LastQueryMergeOperator implements ProcessOperator {
           } else {
             // child operator has next but return an empty TsBlock which means that it may not
             // finish calculation in given time slice.
-            // In such case, LastQueryMergeOperator can't go on calculating, so we just return null.
+            // In such case, LastQueryMergeOperator can't go on calculating, so we just return
+            // null.
             // We can also use the while loop here to continuously call the hasNext() and next()
             // methods of the child operator until its hasNext() returns false or the next() gets
-            // the data that is not empty, but this will cause the execution time of the while loop
+            // the data that is not empty, but this will cause the execution time of the while
+            // loop
             // to be uncontrollable and may exceed all allocated time slice
             return null;
           }
@@ -179,7 +181,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (finished) {
       return false;
     }
@@ -207,7 +209,7 @@ public class LastQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     if (finished) {
       return true;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index c0f825e02a..db4a7e95dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -85,7 +85,7 @@ public class LastQueryOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
 
     // we have consumed up data from children Operator, just return all remaining cached data in
     // tsBlockBuilder
@@ -112,6 +112,7 @@ public class LastQueryOperator implements ProcessOperator {
           LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
         }
       }
+
       currentIndex++;
     }
 
@@ -121,12 +122,12 @@ public class LastQueryOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return currentIndex < inputOperatorsCount || !tsBlockBuilder.isEmpty();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 09779cf106..5a4fbd7c0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -110,7 +110,7 @@ public class LastQuerySortOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // we have consumed up data from children Operator, just return all remaining cached data in
     // cachedTsBlock, tsBlockBuilder and previousTsBlock
     if (currentIndex >= inputOperatorsCount) {
@@ -152,6 +152,7 @@ public class LastQuerySortOperator implements ProcessOperator {
             return null;
           }
         }
+
         currentIndex++;
       }
       if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
@@ -169,7 +170,7 @@ public class LastQuerySortOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return currentIndex < inputOperatorsCount
         || cachedTsBlockRowIndex < cachedTsBlockSize
         || !tsBlockBuilder.isEmpty()
@@ -185,7 +186,7 @@ public class LastQuerySortOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 69972a40e2..7bcc5a61e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -52,7 +52,7 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock res = child.nextWithTimer();
     if (res == null) {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
index c70264a549..a1ec71f1a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelMergeOperator.java
@@ -85,7 +85,7 @@ public class CountGroupByLevelMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -146,12 +146,12 @@ public class CountGroupByLevelMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
index 0e3746e7b3..f852492778 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
@@ -80,7 +80,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -88,7 +88,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (schemaReader == null) {
       schemaReader = createTimeSeriesReader();
     }
@@ -140,7 +140,7 @@ public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements Sou
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 5042ab3460..ba62cb6d01 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -76,10 +76,11 @@ public class CountMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
+
     if (resultTsBlockList != null) {
       currentIndex++;
       return resultTsBlockList.get(currentIndex - 1);
@@ -122,12 +123,12 @@ public class CountMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index 7124b6d27c..19e89ea1f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -74,7 +74,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (isReadingMemory) {
       isReadingMemory = false;
       return transferToTsBlock(data);
@@ -125,7 +125,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return isReadingMemory || child.hasNextWithTimer();
   }
 
@@ -135,7 +135,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !isReadingMemory && child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index 4d04a62e86..10b1d2bf30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -69,7 +69,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock block = child.nextWithTimer();
     if (block == null || block.isEmpty()) {
       return null;
@@ -94,7 +94,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return child.hasNextWithTimer();
   }
 
@@ -104,7 +104,7 @@ public class NodePathsConvertOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return child.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 3a662afe13..a68cc9b6b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -64,7 +64,7 @@ public class NodePathsCountOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     while (!child.isFinished()) {
       // read as much child result as possible
       ListenableFuture<?> blocked = child.isBlocked();
@@ -91,7 +91,7 @@ public class NodePathsCountOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !child.isFinished() || !isFinished;
   }
 
@@ -106,7 +106,7 @@ public class NodePathsCountOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
index 5e313e9fea..d91caee311 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
@@ -69,7 +69,7 @@ public class SchemaCountOperator<T extends ISchemaInfo> implements SourceOperato
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -94,12 +94,12 @@ public class SchemaCountOperator<T extends ISchemaInfo> implements SourceOperato
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !isFinished;
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 977cd388df..6be4893d4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -66,7 +66,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (isReadingStorageGroupInfo) {
       isReadingStorageGroupInfo = false;
       return generateStorageGroupInfo();
@@ -81,7 +81,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return isReadingStorageGroupInfo || currentIndex < childrenCount;
   }
 
@@ -93,7 +93,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
index 0f406e1385..79ee0c381e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java
@@ -81,7 +81,7 @@ public class SchemaFetchScanOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -95,12 +95,12 @@ public class SchemaFetchScanOperator implements SourceOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !isFinished;
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return isFinished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index 0ad36268a5..58ce3b2d2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -50,7 +50,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (children.get(currentIndex).hasNextWithTimer()) {
       return children.get(currentIndex).nextWithTimer();
     } else {
@@ -60,7 +60,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return currentIndex < children.size();
   }
 
@@ -70,7 +70,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 5fc7a1b681..64ba3314e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -70,7 +70,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -187,7 +187,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return resultTsBlockList == null || currentIndex < resultTsBlockList.size();
   }
 
@@ -199,7 +199,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index ee27182135..2c261dbb2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -126,7 +126,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
@@ -146,7 +146,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (schemaReader == null) {
       schemaReader = createSchemaReader();
     }
@@ -154,7 +154,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> implements SourceOpe
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
index 01ecb7f71b..e4b44508d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/IdentitySinkOperator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
 
 import java.util.List;
 
@@ -54,13 +55,22 @@ public class IdentitySinkOperator implements Operator {
   }
 
   @Override
-  public boolean hasNext() {
-    if (children.get(downStreamChannelIndex.getCurrentIndex()).hasNext()) {
+  public boolean hasNext() throws Exception {
+    int currentIndex = downStreamChannelIndex.getCurrentIndex();
+    boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex);
+    if (!currentChannelClosed && children.get(currentIndex).hasNext()) {
       return true;
+    } else if (currentChannelClosed) {
+      // we close the child directly. The child could be an ExchangeOperator which is the downstream
+      // of an ISinkChannel of a pipeline driver.
+      closeCurrentChild(currentIndex);
+    } else {
+      // current child has no more data
+      closeCurrentChild(currentIndex);
+      sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
     }
-    int currentIndex = downStreamChannelIndex.getCurrentIndex();
-    // current channel have no more data
-    sinkHandle.setNoMoreTsBlocksOfOneChannel(downStreamChannelIndex.getCurrentIndex());
+
+    // increment the index
     currentIndex++;
     if (currentIndex >= children.size()) {
       isFinished = true;
@@ -76,8 +86,13 @@ public class IdentitySinkOperator implements Operator {
     return true;
   }
 
+  private void closeCurrentChild(int index) throws Exception {
+    children.get(index).close();
+    children.set(index, null);
+  }
+
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (needToReturnNull) {
       needToReturnNull = false;
       return null;
@@ -91,7 +106,7 @@ public class IdentitySinkOperator implements Operator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return isFinished;
   }
 
@@ -102,8 +117,9 @@ public class IdentitySinkOperator implements Operator {
 
   @Override
   public void close() throws Exception {
-    for (Operator child : children) {
-      child.close();
+    for (int i = downStreamChannelIndex.getCurrentIndex(), n = children.size(); i < n; i++) {
+      Validate.notNull(children.get(i));
+      children.get(i).close();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
index 2e7863eb0f..1a8790b661 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/sink/ShuffleHelperOperator.java
@@ -59,13 +59,20 @@ public class ShuffleHelperOperator implements Operator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     int currentIndex = downStreamChannelIndex.getCurrentIndex();
-    if (children.get(currentIndex).hasNext()) {
+    boolean currentChannelClosed = sinkHandle.isChannelClosed(currentIndex);
+    if (!currentChannelClosed && children.get(currentIndex).hasNext()) {
       return true;
+    } else if (currentChannelClosed) {
+      // we close the child directly. The child could be an ExchangeOperator which is the downstream
+      // of an ISinkChannel of a pipeline driver.
+      closeCurrentChild(currentIndex);
+    } else {
+      // current channel has no more data
+      closeCurrentChild(currentIndex);
+      sinkHandle.setNoMoreTsBlocksOfOneChannel(currentIndex);
     }
-    // current channel have no more data
-    sinkHandle.setNoMoreTsBlocksOfOneChannel(currentIndex);
     unfinishedChildren.remove(currentIndex);
     currentIndex = (currentIndex + 1) % children.size();
     downStreamChannelIndex.setCurrentIndex(currentIndex);
@@ -78,8 +85,13 @@ public class ShuffleHelperOperator implements Operator {
     return true;
   }
 
+  private void closeCurrentChild(int index) throws Exception {
+    children.get(index).close();
+    children.set(index, null);
+  }
+
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (needToReturnNull) {
       needToReturnNull = false;
       return null;
@@ -89,11 +101,20 @@ public class ShuffleHelperOperator implements Operator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    return children.get(downStreamChannelIndex.getCurrentIndex()).isBlocked();
+    int steps = 0;
+    int currentIndex = downStreamChannelIndex.getCurrentIndex();
+    // skip closed children
+    while (children.get(currentIndex) == null && steps < children.size()) {
+      currentIndex = (currentIndex + 1) % children.size();
+      steps++;
+    }
+    downStreamChannelIndex.setCurrentIndex(currentIndex);
+    Operator child = children.get(currentIndex);
+    return child == null ? NOT_BLOCKED : child.isBlocked();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return unfinishedChildren.isEmpty();
   }
 
@@ -105,7 +126,9 @@ public class ShuffleHelperOperator implements Operator {
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
-      child.close();
+      if (child != null) {
+        child.close();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index afb2957f93..67b42a4319 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -113,12 +113,12 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return timeRangeIterator.hasNextTimeRange();
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     // start stopwatch
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
@@ -148,7 +148,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return finished || (finished = !hasNextWithTimer());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
index 47830d42ed..ebc407ea97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java
@@ -62,7 +62,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (retainedTsBlock != null) {
       return getResultFromRetainedTsBlock();
     }
@@ -72,7 +72,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (retainedTsBlock != null) {
       return true;
     }
@@ -117,7 +117,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return finished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 8699c3e135..27ef9a3022 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -69,17 +69,17 @@ public class ExchangeOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     return sourceHandle.receive();
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return !sourceHandle.isFinished();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return sourceHandle.isFinished();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
index 6c5185fbf2..eb975d3469 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -41,19 +41,19 @@ public class LastCacheScanOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock res = tsBlock;
     tsBlock = null;
     return res;
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     return tsBlock != null && !tsBlock.isEmpty();
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return !hasNextWithTimer();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 4032dea5e2..2a315920f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -129,7 +129,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     if (retainedTsBlock != null) {
       return getResultFromRetainedTsBlock();
     }
@@ -139,7 +139,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (retainedTsBlock != null) {
       return true;
     }
@@ -184,7 +184,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return finished;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
index 0a9c86d0c2..ef930f8400 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ShowQueriesOperator.java
@@ -60,7 +60,7 @@ public class ShowQueriesOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() {
+  public TsBlock next() throws Exception {
     TsBlock res = tsBlock;
     hasConsumed = true;
     tsBlock = null;
@@ -68,7 +68,7 @@ public class ShowQueriesOperator implements SourceOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws Exception {
     if (hasConsumed) {
       return false;
     }
@@ -79,7 +79,7 @@ public class ShowQueriesOperator implements SourceOperator {
   }
 
   @Override
-  public boolean isFinished() {
+  public boolean isFinished() throws Exception {
     return hasConsumed;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
index 263254a5f3..f61dd44114 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/api/YieldableReader.java
@@ -19,11 +19,7 @@
 
 package org.apache.iotdb.db.mpp.transformation.api;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-
-import java.io.IOException;
-
 public interface YieldableReader {
 
-  YieldableState yield() throws IOException, QueryProcessException;
+  YieldableState yield() throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
index 4f640e384c..db1272cffd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/IUDFInputDataSet.java
@@ -35,7 +35,7 @@ public interface IUDFInputDataSet {
   boolean hasNextRowInObjects() throws IOException;
 
   /** Whether the data set has next row. */
-  default YieldableState canYieldNextRowInObjects() throws IOException {
+  default YieldableState canYieldNextRowInObjects() throws Exception {
     return hasNextRowInObjects()
         ? YieldableState.YIELDABLE
         : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
index 1aadf50b09..7825b2f423 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/QueryDataSetInputLayer.java
@@ -113,7 +113,7 @@ public class QueryDataSetInputLayer {
     }
 
     @Override
-    public YieldableState yield() throws IOException, QueryProcessException {
+    public YieldableState yield() throws Exception {
       if (hasCachedRowRecord) {
         return YieldableState.YIELDABLE;
       }
@@ -231,7 +231,7 @@ public class QueryDataSetInputLayer {
   private class TimePointReader extends AbstractLayerPointReader {
 
     @Override
-    public YieldableState yield() throws IOException, QueryProcessException {
+    public YieldableState yield() throws Exception {
       if (hasCachedRowRecord) {
         return YieldableState.YIELDABLE;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
index 8f6629aba3..9bb3593441 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/input/TsBlockInputDataSet.java
@@ -50,7 +50,7 @@ public class TsBlockInputDataSet implements IUDFInputDataSet {
   }
 
   @Override
-  public YieldableState canYieldNextRowInObjects() {
+  public YieldableState canYieldNextRowInObjects() throws Exception {
     if (tsBlockRowIterator == null) {
       if (operator.isBlocked() != Operator.NOT_BLOCKED) {
         return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index b2c9c731f0..9c16557c5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -165,7 +165,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
   }
 
   @Override
-  public YieldableState canYieldNextRowInObjects() throws IOException {
+  public YieldableState canYieldNextRowInObjects() throws Exception {
     if (cachedRow != null) {
       return YieldableState.YIELDABLE;
     }
@@ -301,7 +301,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       private boolean currentNull = false;
 
       @Override
-      public YieldableState yield() throws IOException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -384,7 +384,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       private int beginIndex = -slidingStep;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -517,7 +517,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       private int nextIndexBegin = 0;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (rowRecordList.size() == 0 && nextWindowTimeBegin == Long.MIN_VALUE) {
             final YieldableState yieldableState =
@@ -694,7 +694,7 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       private int nextIndexEnd = 1;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (rowRecordList.size() == 0) {
             final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 165a079120..38e474aaba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -90,7 +90,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       }
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -192,7 +192,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       private int currentRowIndex = -1;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -273,7 +273,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       private int beginIndex = -slidingStep;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -402,7 +402,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       private boolean hasAtLeastOneRow;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
@@ -583,7 +583,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       private int nextIndexEnd = 1;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
@@ -701,7 +701,7 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       private ValueRecorder valueRecorder = new ValueRecorder();
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index d6bd270c41..0ff38822d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -78,7 +78,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       private boolean isCurrentNull = false;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (!hasCached) {
           final YieldableState yieldableState = parentLayerPointReader.yield();
           if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -151,7 +151,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       private int beginIndex = -slidingStep;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -277,7 +277,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       private int nextIndexBegin = 0;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
@@ -457,7 +457,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       private int nextIndexEnd = 1;
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
@@ -575,7 +575,7 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       private ValueRecorder valueRecorder = new ValueRecorder();
 
       @Override
-      public YieldableState yield() throws IOException, QueryProcessException {
+      public YieldableState yield() throws Exception {
         if (isFirstIteration) {
           if (tvList.size() == 0) {
             final YieldableState yieldableState =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
index 28fb5f9cae..d99a5c8404 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/Transformer.java
@@ -56,7 +56,7 @@ public abstract class Transformer implements LayerPointReader {
   protected abstract boolean cacheValue() throws QueryProcessException, IOException;
 
   @Override
-  public final YieldableState yield() throws IOException, QueryProcessException {
+  public final YieldableState yield() throws Exception {
     if (hasCachedValue) {
       return YieldableState.YIELDABLE;
     }
@@ -71,7 +71,7 @@ public abstract class Transformer implements LayerPointReader {
   /**
    * if this method returns YieldableState.YIELDABLE, at least one of the cached field should be set
    */
-  protected abstract YieldableState yieldValue() throws QueryProcessException, IOException;
+  protected abstract YieldableState yieldValue() throws Exception;
 
   @Override
   public final void readyForNext() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
index 761c78fcc7..fbfaad56bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/BinaryTransformer.java
@@ -59,7 +59,7 @@ public abstract class BinaryTransformer extends Transformer {
   }
 
   @Override
-  public YieldableState yieldValue() throws IOException, QueryProcessException {
+  public YieldableState yieldValue() throws Exception {
     final YieldableState leftYieldableState = leftPointReader.yield();
     final YieldableState rightYieldableState = rightPointReader.yield();
 
@@ -89,7 +89,7 @@ public abstract class BinaryTransformer extends Transformer {
     return YieldableState.YIELDABLE;
   }
 
-  private YieldableState yieldTime() throws IOException, QueryProcessException {
+  private YieldableState yieldTime() throws Exception {
     if (isCurrentConstant) {
       return YieldableState.YIELDABLE;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
index 6440e39585..839f729cf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/binary/LogicBinaryTransformer.java
@@ -43,7 +43,7 @@ public abstract class LogicBinaryTransformer extends BinaryTransformer {
   }
 
   @Override
-  public YieldableState yieldValue() throws QueryProcessException, IOException {
+  public YieldableState yieldValue() throws Exception {
     final YieldableState leftYieldableState = leftPointReader.yield();
     final YieldableState rightYieldableState = rightPointReader.yield();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
index c783d32294..7be1849b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/MappableUDFQueryRowTransformer.java
@@ -63,7 +63,7 @@ public class MappableUDFQueryRowTransformer extends UDFQueryTransformer {
   }
 
   @Override
-  protected YieldableState yieldValue() throws QueryProcessException, IOException {
+  protected YieldableState yieldValue() throws Exception {
     final YieldableState yieldableState = layerRowReader.yield();
     if (!YieldableState.YIELDABLE.equals(yieldableState)) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
index 9128d9a1c6..4fc9953d28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowTransformer.java
@@ -36,7 +36,7 @@ public class UDFQueryRowTransformer extends UniversalUDFQueryTransformer {
   }
 
   @Override
-  protected YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException {
+  protected YieldableState tryExecuteUDFOnce() throws Exception {
     final YieldableState yieldableState = layerRowReader.yield();
     if (yieldableState != YieldableState.YIELDABLE) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
index 9776390953..fb045b710f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UDFQueryRowWindowTransformer.java
@@ -37,7 +37,7 @@ public class UDFQueryRowWindowTransformer extends UniversalUDFQueryTransformer {
   }
 
   @Override
-  protected YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException {
+  protected YieldableState tryExecuteUDFOnce() throws Exception {
     final YieldableState yieldableState = layerRowWindowReader.yield();
     if (yieldableState != YieldableState.YIELDABLE) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
index d19f0d2740..4e60a3e4b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/multi/UniversalUDFQueryTransformer.java
@@ -39,7 +39,7 @@ public abstract class UniversalUDFQueryTransformer extends UDFQueryTransformer {
   }
 
   @Override
-  protected final YieldableState yieldValue() throws QueryProcessException, IOException {
+  protected final YieldableState yieldValue() throws Exception {
     while (!cacheValueFromUDFOutput()) {
       final YieldableState udfYieldableState = tryExecuteUDFOnce();
       if (udfYieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -62,7 +62,7 @@ public abstract class UniversalUDFQueryTransformer extends UDFQueryTransformer {
     return true;
   }
 
-  protected abstract YieldableState tryExecuteUDFOnce() throws QueryProcessException, IOException;
+  protected abstract YieldableState tryExecuteUDFOnce() throws Exception;
 
   protected abstract boolean executeUDFOnce() throws QueryProcessException, IOException;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
index 1e72272cfc..07c7d6911f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/ternary/TernaryTransformer.java
@@ -45,7 +45,7 @@ public abstract class TernaryTransformer extends Transformer {
   protected final boolean isCurrentConstant;
 
   @Override
-  protected YieldableState yieldValue() throws QueryProcessException, IOException {
+  protected YieldableState yieldValue() throws Exception {
     final YieldableState firstYieldableState = firstPointReader.yield();
     final YieldableState secondYieldableState = secondPointReader.yield();
     final YieldableState thirdYieldableState = thirdPointReader.yield();
@@ -81,7 +81,7 @@ public abstract class TernaryTransformer extends Transformer {
     return YieldableState.YIELDABLE;
   }
 
-  private YieldableState yieldTime() throws IOException, QueryProcessException {
+  private YieldableState yieldTime() throws Exception {
     if (isCurrentConstant) {
       return YieldableState.YIELDABLE;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
index f772276566..56281a315c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/IsNullTransformer.java
@@ -40,7 +40,7 @@ public class IsNullTransformer extends UnaryTransformer {
   }
 
   @Override
-  public final YieldableState yieldValue() throws IOException, QueryProcessException {
+  public final YieldableState yieldValue() throws Exception {
     final YieldableState yieldableState = layerPointReader.yield();
     if (!YieldableState.YIELDABLE.equals(yieldableState)) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
index 65d6a52776..72079689cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/UnaryTransformer.java
@@ -45,7 +45,7 @@ public abstract class UnaryTransformer extends Transformer {
   }
 
   @Override
-  public YieldableState yieldValue() throws IOException, QueryProcessException {
+  public YieldableState yieldValue() throws Exception {
     final YieldableState yieldableState = layerPointReader.yield();
     if (!YieldableState.YIELDABLE.equals(yieldableState)) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
index 476b71b4c6..a237a19147 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/transformer/unary/scalar/DiffFunctionTransformer.java
@@ -47,7 +47,7 @@ public class DiffFunctionTransformer extends UnaryTransformer {
   }
 
   @Override
-  public final YieldableState yieldValue() throws IOException, QueryProcessException {
+  public final YieldableState yieldValue() throws Exception {
     final YieldableState yieldableState = layerPointReader.yield();
     if (!YieldableState.YIELDABLE.equals(yieldableState)) {
       return yieldableState;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
index 791628ec7b..2725af36a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/LayerCacheUtils.java
@@ -39,7 +39,7 @@ public class LayerCacheUtils {
       LayerPointReader source,
       ElasticSerializableTVList target,
       int pointNumber)
-      throws QueryProcessException, IOException {
+      throws Exception {
     int count = 0;
     while (count < pointNumber) {
       final YieldableState yieldableState = yieldPoint(dataType, source, target);
@@ -53,7 +53,7 @@ public class LayerCacheUtils {
 
   public static YieldableState yieldPoint(
       TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target)
-      throws IOException, QueryProcessException {
+      throws Exception {
     final YieldableState yieldableState = source.yield();
     if (yieldableState != YieldableState.YIELDABLE) {
       return yieldableState;
@@ -96,7 +96,7 @@ public class LayerCacheUtils {
   /** @return number of actually collected, which may be less than or equals to rowsNumber */
   public static YieldableState yieldRows(
       IUDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
-      throws QueryProcessException, IOException {
+      throws Exception {
     int count = 0;
     while (count < rowsNumber) {
       final YieldableState yieldableState = yieldRow(source, target);
@@ -109,9 +109,9 @@ public class LayerCacheUtils {
   }
 
   public static YieldableState yieldRow(
-      IUDFInputDataSet source, ElasticSerializableRowRecordList target)
-      throws IOException, QueryProcessException {
+      IUDFInputDataSet source, ElasticSerializableRowRecordList target) throws Exception {
     final YieldableState yieldableState = source.canYieldNextRowInObjects();
+
     if (yieldableState == YieldableState.YIELDABLE) {
       target.put(source.nextRowInObjects());
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
index 2ccc9bf8ee..7b28462fe6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSink.java
@@ -73,6 +73,11 @@ public class StubSink implements ISink {
     instanceContext.transitionToFlushing();
   }
 
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+
   @Override
   public boolean isAborted() {
     return closed;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index ca119a07ee..0147d33c3a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -59,7 +59,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -97,8 +96,7 @@ public class AggregationOperatorTest {
 
   /** Try to aggregate unary intermediate result of one time series without group by interval. */
   @Test
-  public void testAggregateIntermediateResult1()
-      throws IllegalPathException, ExecutionException, InterruptedException {
+  public void testAggregateIntermediateResult1() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.COUNT);
     aggregationTypes.add(TAggregationType.SUM);
@@ -122,7 +120,9 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = null;
+      resultTsBlock = aggregationOperator.next();
+
       if (resultTsBlock == null) {
         continue;
       }
@@ -139,8 +139,7 @@ public class AggregationOperatorTest {
 
   /** Try to aggregate binary intermediate result of one time series without group by interval. */
   @Test
-  public void testAggregateIntermediateResult2()
-      throws IllegalPathException, ExecutionException, InterruptedException {
+  public void testAggregateIntermediateResult2() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.AVG);
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
@@ -163,7 +162,8 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = null;
+      resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -176,8 +176,7 @@ public class AggregationOperatorTest {
   }
 
   @Test
-  public void testGroupByIntermediateResult1()
-      throws IllegalPathException, ExecutionException, InterruptedException {
+  public void testGroupByIntermediateResult1() throws Exception {
     int[][] result =
         new int[][] {
           {100, 100, 100, 99},
@@ -211,7 +210,8 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = null;
+      resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -231,8 +231,7 @@ public class AggregationOperatorTest {
   }
 
   @Test
-  public void testGroupByIntermediateResult2()
-      throws IllegalPathException, ExecutionException, InterruptedException {
+  public void testGroupByIntermediateResult2() throws Exception {
     double[][] result =
         new double[][] {
           {20049.5, 20149.5, 6249.5, 8429.808},
@@ -262,7 +261,8 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = null;
+      resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 6d45de1d7f..f5d7e9a003 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -95,7 +95,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+  public void testAggregationWithoutTimeFilter() throws Exception {
     List<Aggregator> aggregators = new ArrayList<>();
     for (int i = 0; i < measurementSchemas.size(); i++) {
       TSDataType dataType = measurementSchemas.get(i).getType();
@@ -126,7 +126,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
+  public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception {
     List<Aggregator> aggregators = new ArrayList<>();
     for (int i = 0; i < measurementSchemas.size(); i++) {
       TSDataType dataType = measurementSchemas.get(i).getType();
@@ -153,11 +153,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
       }
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.COUNT);
     aggregationTypes.add(TAggregationType.SUM);
@@ -190,7 +191,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -231,8 +232,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
-      throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -273,7 +273,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testAggregationWithTimeFilter1() throws IllegalPathException {
+  public void testAggregationWithTimeFilter1() throws Exception {
     List<Aggregator> aggregators = new ArrayList<>();
     for (int i = 0; i < measurementSchemas.size(); i++) {
       TSDataType dataType = measurementSchemas.get(i).getType();
@@ -301,11 +301,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
       }
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithTimeFilter2() throws IllegalPathException {
+  public void testAggregationWithTimeFilter2() throws Exception {
     Filter timeFilter = TimeFilter.ltEq(379);
     List<Aggregator> aggregators = new ArrayList<>();
     for (int i = 0; i < measurementSchemas.size(); i++) {
@@ -333,11 +334,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
       }
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithTimeFilter3() throws IllegalPathException {
+  public void testAggregationWithTimeFilter3() throws Exception {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     List<Aggregator> aggregators = new ArrayList<>();
     for (int i = 0; i < measurementSchemas.size(); i++) {
@@ -369,7 +371,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+  public void testMultiAggregationWithTimeFilter() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -407,11 +409,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
       assertEquals(399, resultTsBlock.getColumn(5).getLong(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+  public void testGroupByWithoutGlobalTimeFilter() throws Exception {
     int[] result = new int[] {100, 100, 100, 99};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     List<Aggregator> aggregators = new ArrayList<>();
@@ -444,11 +447,12 @@ public class AlignedSeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(4, count);
   }
 
   @Test
-  public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+  public void testGroupByWithGlobalTimeFilter() throws Exception {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
@@ -487,7 +491,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testGroupByWithMultiFunction() throws IllegalPathException {
+  public void testGroupByWithMultiFunction() throws Exception {
     int[][] result =
         new int[][] {
           {20000, 20100, 10200, 10300},
@@ -530,7 +534,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception {
     int[][] result =
         new int[][] {
           {20000, 20100, 10200, 10300},
@@ -573,7 +577,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+  public void testGroupBySlidingTimeWindow() throws Exception {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 49};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -603,7 +607,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+  public void testGroupBySlidingTimeWindow2() throws Exception {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
@@ -634,7 +638,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+  public void testGroupBySlidingWindowWithMultiFunction() throws Exception {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[][] result =
         new int[][] {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
index 894ef7c8c3..5366412433 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -96,7 +96,7 @@ public class AlignedSeriesScanOperatorTest {
   }
 
   @Test
-  public void batchTest1() {
+  public void batchTest1() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -173,7 +173,7 @@ public class AlignedSeriesScanOperatorTest {
   }
 
   @Test
-  public void batchTest2() {
+  public void batchTest2() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -462,7 +462,7 @@ public class AlignedSeriesScanOperatorTest {
 
   /** order by time desc */
   @Test
-  public void batchTest3() {
+  public void batchTest3() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
deleted file mode 100644
index 9960a40286..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.operator;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class DeviceMergeOperatorTest {
-
-  private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
-  private final List<String> deviceIds = new ArrayList<>();
-  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-
-  private final List<TsFileResource> seqResources = new ArrayList<>();
-  private final List<TsFileResource> unSeqResources = new ArrayList<>();
-
-  @Before
-  public void setUp() throws MetadataException, IOException, WriteProcessException {
-    SeriesReaderTestUtil.setUp(
-        measurementSchemas, deviceIds, seqResources, unSeqResources, DEVICE_MERGE_OPERATOR_TEST_SG);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
-  }
-
-  /**
-   * Construct DeviceMergeOperator with different devices in two DeviceViewOperators.
-   *
-   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
-   *
-   * <p>DeviceViewOperator2: [seriesScanOperator: [device1.sensor1]]
-   *
-   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
-   * and the sensor0 column of device1 should be null.
-   */
-  @Test
-  public void deviceMergeOperatorTest() {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
-    try {
-      // Construct operator tree
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId3 = new PlanNodeId("3");
-      driverContext.addOperatorContext(
-          3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
-      PlanNodeId planNodeId4 = new PlanNodeId("4");
-      driverContext.addOperatorContext(
-          4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
-      PlanNodeId planNodeId5 = new PlanNodeId("5");
-      driverContext.addOperatorContext(5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
-
-      List<TSDataType> dataTypes = new ArrayList<>();
-      dataTypes.add(TSDataType.TEXT);
-      dataTypes.add(TSDataType.INT32);
-      dataTypes.add(TSDataType.INT32);
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-
-      SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      DeviceViewOperator deviceViewOperator1 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(2),
-              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
-              Collections.singletonList(seriesScanOperator1),
-              Collections.singletonList(Collections.singletonList(1)),
-              dataTypes);
-
-      MeasurementPath measurementPath2 =
-          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor1"));
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath2,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      DeviceViewOperator deviceViewOperator2 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(3),
-              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1"),
-              Collections.singletonList(seriesScanOperator2),
-              Collections.singletonList(Collections.singletonList(2)),
-              dataTypes);
-
-      List<String> devices = new ArrayList<>();
-      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
-      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
-      List<Operator> deviceOperators = new ArrayList<>();
-      deviceOperators.add(deviceViewOperator1);
-      deviceOperators.add(deviceViewOperator2);
-      DeviceMergeOperator deviceMergeOperator =
-          new DeviceMergeOperator(
-              driverContext.getOperatorContexts().get(4),
-              devices,
-              deviceOperators,
-              dataTypes,
-              new TimeSelector(500, true),
-              new AscTimeComparator());
-
-      int count = 0;
-      while (deviceMergeOperator.hasNext()) {
-        TsBlock tsBlock = deviceMergeOperator.next();
-        if (tsBlock == null) {
-          continue;
-        }
-        assertEquals(3, tsBlock.getValueColumnCount());
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
-          long expectedTime = count % 500;
-          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
-          assertEquals(
-              count < 500
-                  ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
-                  : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
-              tsBlock.getColumn(0).getBinary(i).getStringValue());
-          if (expectedTime < 200) {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          } else if (expectedTime < 260
-              || (expectedTime >= 300 && expectedTime < 380)
-              || expectedTime >= 400) {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          } else {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          }
-        }
-      }
-      assertEquals(1000, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    }
-  }
-
-  /**
-   * Construct DeviceMergeOperator with the same device in two DeviceViewOperators.
-   *
-   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
-   *
-   * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
-   *
-   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
-   * and the sensor0 column of device1 should be null.
-   */
-  @Test
-  public void deviceMergeOperatorTest2() {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
-    try {
-      // Construct operator tree
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId3 = new PlanNodeId("3");
-      driverContext.addOperatorContext(
-          3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
-      PlanNodeId planNodeId4 = new PlanNodeId("4");
-      driverContext.addOperatorContext(
-          4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
-      PlanNodeId planNodeId5 = new PlanNodeId("5");
-      driverContext.addOperatorContext(5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
-
-      List<TSDataType> dataTypes = new ArrayList<>();
-      dataTypes.add(TSDataType.TEXT);
-      dataTypes.add(TSDataType.INT32);
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-      SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      List<TsFileResource> seqResources1 = new ArrayList<>();
-      List<TsFileResource> unSeqResources1 = new ArrayList<>();
-      seqResources1.add(seqResources.get(0));
-      seqResources1.add(seqResources.get(1));
-      seqResources1.add(seqResources.get(3));
-      unSeqResources1.add(unSeqResources.get(0));
-      unSeqResources1.add(unSeqResources.get(1));
-      unSeqResources1.add(unSeqResources.get(3));
-      unSeqResources1.add(unSeqResources.get(5));
-      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      DeviceViewOperator deviceViewOperator1 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(2),
-              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
-              Collections.singletonList(seriesScanOperator1),
-              Collections.singletonList(Collections.singletonList(1)),
-              dataTypes);
-
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      List<TsFileResource> seqResources2 = new ArrayList<>();
-      List<TsFileResource> unSeqResources2 = new ArrayList<>();
-      seqResources2.add(seqResources.get(2));
-      seqResources2.add(seqResources.get(4));
-      unSeqResources2.add(unSeqResources.get(2));
-      unSeqResources2.add(unSeqResources.get(4));
-      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
-      DeviceViewOperator deviceViewOperator2 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(3),
-              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
-              Collections.singletonList(seriesScanOperator2),
-              Collections.singletonList(Collections.singletonList(1)),
-              dataTypes);
-
-      List<String> devices = new ArrayList<>();
-      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
-      List<Operator> deviceOperators = new ArrayList<>();
-      deviceOperators.add(deviceViewOperator1);
-      deviceOperators.add(deviceViewOperator2);
-      DeviceMergeOperator deviceMergeOperator =
-          new DeviceMergeOperator(
-              driverContext.getOperatorContexts().get(4),
-              devices,
-              deviceOperators,
-              dataTypes,
-              new TimeSelector(500, true),
-              new AscTimeComparator());
-
-      int count = 0;
-      while (deviceMergeOperator.hasNext()) {
-        TsBlock tsBlock = deviceMergeOperator.next();
-        if (tsBlock == null) {
-          continue;
-        }
-        assertEquals(2, tsBlock.getValueColumnCount());
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
-          assertEquals(count, tsBlock.getTimeByIndex(i));
-          assertEquals(
-              DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
-              tsBlock.getColumn(0).getBinary(i).getStringValue());
-          if ((long) count < 200) {
-            assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else if ((long) count < 260
-              || ((long) count >= 300 && (long) count < 380)
-              || (long) count >= 400) {
-            assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else {
-            assertEquals(count, tsBlock.getColumn(1).getInt(i));
-          }
-        }
-      }
-      assertEquals(500, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    }
-  }
-
-  /**
-   * Construct DeviceMergeOperator with the same and different device at the same time in two
-   * DeviceViewOperators.
-   *
-   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0], [device1.sensor1]],
-   *
-   * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
-   *
-   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
-   * and the sensor0 column of device1 should be null.
-   */
-  @Test
-  public void deviceMergeOperatorTest3() {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
-    try {
-      // Construct operator tree
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId3 = new PlanNodeId("3");
-      driverContext.addOperatorContext(3, planNodeId3, SeriesScanOperator.class.getSimpleName());
-      driverContext.addOperatorContext(
-          4, new PlanNodeId("4"), DeviceViewOperatorTest.class.getSimpleName());
-      driverContext.addOperatorContext(
-          5, new PlanNodeId("5"), DeviceViewOperatorTest.class.getSimpleName());
-      driverContext.addOperatorContext(
-          6, new PlanNodeId("6"), DeviceMergeOperator.class.getSimpleName());
-
-      List<TSDataType> dataTypes = new ArrayList<>();
-      dataTypes.add(TSDataType.TEXT);
-      dataTypes.add(TSDataType.INT32);
-      dataTypes.add(TSDataType.INT32);
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-      SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      List<TsFileResource> seqResources1 = new ArrayList<>();
-      List<TsFileResource> unSeqResources1 = new ArrayList<>();
-      seqResources1.add(seqResources.get(0));
-      seqResources1.add(seqResources.get(1));
-      seqResources1.add(seqResources.get(3));
-      unSeqResources1.add(unSeqResources.get(0));
-      unSeqResources1.add(unSeqResources.get(1));
-      unSeqResources1.add(unSeqResources.get(3));
-      unSeqResources1.add(unSeqResources.get(5));
-      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath2 =
-          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor1"));
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath2,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      List<String> devices = new ArrayList<>();
-      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
-      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
-      List<Operator> deviceOperators = new ArrayList<>();
-      deviceOperators.add(seriesScanOperator1);
-      deviceOperators.add(seriesScanOperator2);
-      List<List<Integer>> deviceColumnIndex = new ArrayList<>();
-      deviceColumnIndex.add(Collections.singletonList(1));
-      deviceColumnIndex.add(Collections.singletonList(2));
-      DeviceViewOperator deviceViewOperator1 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(3),
-              devices,
-              deviceOperators,
-              deviceColumnIndex,
-              dataTypes);
-
-      scanOptionsBuilder.withAllSensors(Collections.singleton("sensor0"));
-      SeriesScanOperator seriesScanOperator3 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(2),
-              planNodeId3,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      List<TsFileResource> seqResources2 = new ArrayList<>();
-      List<TsFileResource> unSeqResources2 = new ArrayList<>();
-      seqResources2.add(seqResources.get(2));
-      seqResources2.add(seqResources.get(4));
-      unSeqResources2.add(unSeqResources.get(2));
-      unSeqResources2.add(unSeqResources.get(4));
-      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
-      DeviceViewOperator deviceViewOperator2 =
-          new DeviceViewOperator(
-              driverContext.getOperatorContexts().get(4),
-              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
-              Collections.singletonList(seriesScanOperator3),
-              Collections.singletonList(Collections.singletonList(1)),
-              dataTypes);
-
-      List<Operator> deviceViewOperators = new ArrayList<>();
-      deviceViewOperators.add(deviceViewOperator1);
-      deviceViewOperators.add(deviceViewOperator2);
-      DeviceMergeOperator deviceMergeOperator =
-          new DeviceMergeOperator(
-              driverContext.getOperatorContexts().get(5),
-              devices,
-              deviceViewOperators,
-              dataTypes,
-              new TimeSelector(500, true),
-              new AscTimeComparator());
-
-      int count = 0;
-      while (deviceMergeOperator.hasNext()) {
-        TsBlock tsBlock = deviceMergeOperator.next();
-        if (tsBlock == null) {
-          continue;
-        }
-        assertEquals(3, tsBlock.getValueColumnCount());
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
-          long expectedTime = count % 500;
-          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
-          assertEquals(
-              count < 500
-                  ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
-                  : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
-              tsBlock.getColumn(0).getBinary(i).getStringValue());
-          if (expectedTime < 200) {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          } else if (expectedTime < 260
-              || (expectedTime >= 300 && expectedTime < 380)
-              || expectedTime >= 400) {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          } else {
-            if (!tsBlock.getColumn(1).isNull(i)) {
-              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
-              assertTrue(tsBlock.getColumn(2).isNull(i));
-            } else {
-              assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
-            }
-          }
-        }
-      }
-      assertEquals(1000, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    }
-  }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
index 9e53ab2fe0..b524825fbb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -195,7 +194,7 @@ public class DeviceViewOperatorTest {
         }
       }
       assertEquals(1000, count);
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
index af84ea03a8..75da328970 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
 public class FillOperatorTest {
 
   @Test
-  public void batchConstantFillTest() {
+  public void batchConstantFillTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -80,7 +80,7 @@ public class FillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   int delta = index * 10000;
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
@@ -122,12 +122,12 @@ public class FillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
@@ -220,7 +220,7 @@ public class FillOperatorTest {
   }
 
   @Test
-  public void batchPreviousFillTest() {
+  public void batchPreviousFillTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -250,7 +250,7 @@ public class FillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   int delta = index * 10000;
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
@@ -291,12 +291,12 @@ public class FillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
index 1430fc4a11..15e803e7f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/HorizontallyConcatOperatorTest.java
@@ -92,7 +92,7 @@ public class HorizontallyConcatOperatorTest {
   }
 
   @Test
-  public void batchTest1() {
+  public void batchTest1() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index c409373a4d..8a4862fa4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -63,7 +63,7 @@ public class LastQueryMergeOperatorTest {
   }
 
   @Test
-  public void testLastQueryMergeOperatorDesc() {
+  public void testLastQueryMergeOperatorDesc() throws Exception {
 
     QueryId queryId = new QueryId("stub_query");
     FragmentInstanceId instanceId =
@@ -104,7 +104,7 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public TsBlock next() {
+          public TsBlock next() throws Exception {
             TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
             for (int i = timeArray[index].length - 1; i >= 0; i--) {
               LastQueryUtil.appendLastValue(
@@ -119,12 +119,12 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public boolean hasNext() {
+          public boolean hasNext() throws Exception {
             return index >= 0;
           }
 
           @Override
-          public boolean isFinished() {
+          public boolean isFinished() throws Exception {
             return !hasNext();
           }
 
@@ -168,7 +168,7 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public TsBlock next() {
+          public TsBlock next() throws Exception {
             TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
             for (int i = timeArray[index].length - 1; i >= 0; i--) {
               LastQueryUtil.appendLastValue(
@@ -183,12 +183,12 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public boolean hasNext() {
+          public boolean hasNext() throws Exception {
             return index >= 0;
           }
 
           @Override
-          public boolean isFinished() {
+          public boolean isFinished() throws Exception {
             return !hasNext();
           }
 
@@ -241,7 +241,9 @@ public class LastQueryMergeOperatorTest {
     int count = timeArray.length - 1;
     while (!lastQueryMergeOperator.isFinished()) {
       assertTrue(lastQueryMergeOperator.isBlocked().isDone());
-      TsBlock result = lastQueryMergeOperator.next();
+      TsBlock result = null;
+      result = lastQueryMergeOperator.next();
+
       if (result == null) {
         continue;
       }
@@ -255,11 +257,12 @@ public class LastQueryMergeOperatorTest {
         count--;
       }
     }
+
     assertEquals(-1, count);
   }
 
   @Test
-  public void testLastQueryMergeOperatorAsc() {
+  public void testLastQueryMergeOperatorAsc() throws Exception {
 
     QueryId queryId = new QueryId("stub_query");
     FragmentInstanceId instanceId =
@@ -300,7 +303,7 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public TsBlock next() {
+          public TsBlock next() throws Exception {
             TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
             for (int i = 0, size = timeArray[index].length; i < size; i++) {
               LastQueryUtil.appendLastValue(
@@ -315,12 +318,13 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public boolean hasNext() {
+          public boolean hasNext() throws Exception {
             return index < 2;
           }
 
           @Override
-          public boolean isFinished() {
+          public boolean isFinished() throws Exception {
+
             return !hasNext();
           }
 
@@ -364,7 +368,7 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public TsBlock next() {
+          public TsBlock next() throws Exception {
             TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(4);
             for (int i = 0, size = timeArray[index].length; i < size; i++) {
               LastQueryUtil.appendLastValue(
@@ -379,12 +383,13 @@ public class LastQueryMergeOperatorTest {
           }
 
           @Override
-          public boolean hasNext() {
+          public boolean hasNext() throws Exception {
             return index < 2;
           }
 
           @Override
-          public boolean isFinished() {
+          public boolean isFinished() throws Exception {
+
             return !hasNext();
           }
 
@@ -437,7 +442,8 @@ public class LastQueryMergeOperatorTest {
     int count = 0;
     while (!lastQueryMergeOperator.isFinished()) {
       assertTrue(lastQueryMergeOperator.isBlocked().isDone());
-      TsBlock result = lastQueryMergeOperator.next();
+      TsBlock result = null;
+      result = lastQueryMergeOperator.next();
       if (result == null) {
         continue;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index 5d387029ce..e3141272de 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -90,7 +90,7 @@ public class LastQueryOperatorTest {
   }
 
   @Test
-  public void testLastQueryOperator1() {
+  public void testLastQueryOperator1() throws Exception {
     try {
       List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
       MeasurementPath measurementPath1 =
@@ -341,7 +341,7 @@ public class LastQueryOperatorTest {
         }
       }
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
index ea669cf1de..76b26258e6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQuerySortOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -209,7 +208,7 @@ public class LastQuerySortOperatorTest {
         }
       }
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
@@ -343,7 +342,7 @@ public class LastQuerySortOperatorTest {
         }
       }
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
index 79a4d56445..a69d41765d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
@@ -86,7 +86,7 @@ public class LimitOperatorTest {
   }
 
   @Test
-  public void batchTest() {
+  public void batchTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 6bac123f24..877510fe2e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertTrue;
 public class LinearFillOperatorTest {
 
   @Test
-  public void batchLinearFillTest1() {
+  public void batchLinearFillTest1() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -131,7 +131,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
                           ImmutableList.of(
@@ -155,12 +155,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
@@ -261,7 +261,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillTest1OrderByDesc() {
+  public void batchLinearFillTest1OrderByDesc() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -344,7 +344,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
                           ImmutableList.of(
@@ -368,12 +368,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
@@ -474,7 +474,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillTest2() {
+  public void batchLinearFillTest2() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -557,7 +557,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
                           ImmutableList.of(
@@ -581,12 +581,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
@@ -687,7 +687,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillTest2OrderByDesc() {
+  public void batchLinearFillTest2OrderByDesc() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -770,7 +770,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder =
                       new TsBlockBuilder(
                           ImmutableList.of(
@@ -794,12 +794,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 3;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 3;
                 }
 
@@ -900,7 +900,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillTest3() {
+  public void batchLinearFillTest3() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -937,7 +937,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
                   for (int i = 0; i < 1; i++) {
                     builder.getTimeColumnBuilder().writeLong(i + index);
@@ -955,12 +955,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 7;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 7;
                 }
 
@@ -1020,7 +1020,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillTest3OrderByDesc() {
+  public void batchLinearFillTest3OrderByDesc() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -1057,7 +1057,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
                   for (int i = 0; i < 1; i++) {
                     builder.getTimeColumnBuilder().writeLong(i + (6 - index));
@@ -1075,12 +1075,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 7;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 7;
                 }
 
@@ -1140,7 +1140,7 @@ public class LinearFillOperatorTest {
   }
 
   @Test
-  public void batchLinearFillBooleanTest() {
+  public void batchLinearFillBooleanTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -1177,7 +1177,7 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public TsBlock next() {
+                public TsBlock next() throws Exception {
                   TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN));
                   for (int i = 0; i < 1; i++) {
                     builder.getTimeColumnBuilder().writeLong(i + index);
@@ -1195,12 +1195,12 @@ public class LinearFillOperatorTest {
                 }
 
                 @Override
-                public boolean hasNext() {
+                public boolean hasNext() throws Exception {
                   return index < 7;
                 }
 
                 @Override
-                public boolean isFinished() {
+                public boolean isFinished() throws Exception {
                   return index >= 7;
                 }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 73e93e6f67..f91bc21956 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -342,7 +342,7 @@ public class MergeSortOperatorTest {
   }
 
   @Test
-  public void testOrderByTime1() {
+  public void testOrderByTime1() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.ASC);
     long lastTime = -1;
     int checkDevice = 0;
@@ -385,11 +385,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 1500);
   }
 
   @Test
-  public void testOrderByTime2() {
+  public void testOrderByTime2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.DESC);
     long lastTime = -1;
     int checkDevice = 0;
@@ -432,11 +433,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 1500);
   }
 
   @Test
-  public void testOrderByTime3() {
+  public void testOrderByTime3() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.DESC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
@@ -479,11 +481,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 1500);
   }
 
   @Test
-  public void testOrderByTime4() {
+  public void testOrderByTime4() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.ASC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
@@ -526,6 +529,7 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 1500);
   }
 
@@ -835,7 +839,7 @@ public class MergeSortOperatorTest {
   }
 
   @Test
-  public void testOrderByTime1_2() {
+  public void testOrderByTime1_2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.ASC);
     long lastTime = -1;
     int checkDevice = 0;
@@ -874,15 +878,17 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
 
   @Test
-  public void testOrderByTime2_2() {
+  public void testOrderByTime2_2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.DESC);
     long lastTime = -1;
     int checkDevice = 0;
     int count = 0;
+
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
       if (tsBlock == null) continue;
@@ -917,11 +923,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
 
   @Test
-  public void testOrderByTime3_2() {
+  public void testOrderByTime3_2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.DESC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
@@ -960,11 +967,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
 
   @Test
-  public void testOrderByTime4_2() {
+  public void testOrderByTime4_2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.ASC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
@@ -1003,6 +1011,7 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
   // ------------------------------------------------------------------------------------------------
@@ -1274,7 +1283,7 @@ public class MergeSortOperatorTest {
   }
 
   @Test
-  public void testOrderByDevice1() {
+  public void testOrderByDevice1() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.ASC);
     long lastTime = -1;
     int checkDevice = 0;
@@ -1322,11 +1331,12 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
 
   @Test
-  public void testOrderByDevice2() {
+  public void testOrderByDevice2() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.DESC);
     long lastTime = -1;
     int checkDevice = 0;
@@ -1374,15 +1384,17 @@ public class MergeSortOperatorTest {
         }
       }
     }
+
     assertEquals(count, 2000);
   }
 
   @Test
-  public void testOrderByDevice3() {
+  public void testOrderByDevice3() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.ASC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
     int count = 0;
+
     while (mergeSortOperator.isBlocked().isDone() && mergeSortOperator.hasNext()) {
       TsBlock tsBlock = mergeSortOperator.next();
       if (tsBlock == null) continue;
@@ -1430,7 +1442,7 @@ public class MergeSortOperatorTest {
   }
 
   @Test
-  public void testOrderByDevice4() {
+  public void testOrderByDevice4() throws Exception {
     MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.DESC);
     long lastTime = Long.MAX_VALUE;
     int checkDevice = 0;
@@ -1493,7 +1505,7 @@ public class MergeSortOperatorTest {
   //                        ShowQueriesOperator      ShowQueriesOperator
   // ------------------------------------------------------------------------------------------------
   @Test
-  public void mergeSortWithSortOperatorTest() {
+  public void mergeSortWithSortOperatorTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index 16747a70ea..b4a098282e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -87,7 +87,7 @@ public class OffsetOperatorTest {
   }
 
   @Test
-  public void batchTest1() {
+  public void batchTest1() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -192,7 +192,7 @@ public class OffsetOperatorTest {
 
   /** offset is 0 in which case we will get all data */
   @Test
-  public void batchTest2() {
+  public void batchTest2() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -294,7 +294,7 @@ public class OffsetOperatorTest {
 
   /** offset is larger than max row number in which case we will get no data */
   @Test
-  public void batchTest3() {
+  public void batchTest3() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index aa333914a9..3f3fc62d27 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -114,7 +114,7 @@ public class RawDataAggregationOperatorTest {
    * always used with value filter.
    */
   @Test
-  public void aggregateRawDataTest1() throws IllegalPathException {
+  public void aggregateRawDataTest1() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     List<List<InputLocation[]>> inputLocations = new ArrayList<>();
     for (int i = 0; i < 2; i++) {
@@ -165,7 +165,7 @@ public class RawDataAggregationOperatorTest {
    * always used with value filter.
    */
   @Test
-  public void aggregateRawDataTest2() throws IllegalPathException {
+  public void aggregateRawDataTest2() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     List<List<InputLocation[]>> inputLocations = new ArrayList<>();
     for (int i = 0; i < 2; i++) {
@@ -209,12 +209,13 @@ public class RawDataAggregationOperatorTest {
       }
       count++;
     }
+
     assertEquals(1, count);
   }
 
   /** Test aggregating raw data by time interval. */
   @Test
-  public void groupByTimeRawDataTest1() throws IllegalPathException {
+  public void groupByTimeRawDataTest1() throws Exception {
     int[][] result =
         new int[][] {
           {100, 100, 100, 99},
@@ -247,6 +248,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(
             aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -265,12 +267,13 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(4, count);
   }
 
   /** Test aggregating raw data by time interval. */
   @Test
-  public void groupByTimeRawDataTest2() throws IllegalPathException {
+  public void groupByTimeRawDataTest2() throws Exception {
     double[][] result =
         new double[][] {
           {20049.5, 20149.5, 6249.5, 8429.808},
@@ -305,6 +308,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(
             aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -325,12 +329,13 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(4, count);
   }
 
   /** Test by time interval with EndTime */
   @Test
-  public void groupByTimeRawDataTest3() throws IllegalPathException {
+  public void groupByTimeRawDataTest3() throws Exception {
     int[][] result =
         new int[][] {
           {100, 100, 100, 99},
@@ -367,6 +372,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(
             aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -394,12 +400,13 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(4, count);
   }
 
   /** 0 - 99 100 - 199 200 - 299 300 - 399 400 - 499 500 - 599 */
   @Test
-  public void groupByTimeRawDataTest4() throws IllegalPathException {
+  public void groupByTimeRawDataTest4() throws Exception {
     int[][] result =
         new int[][] {
           {100, 100, 100, 100, 100, 0},
@@ -430,6 +437,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(
             aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -457,6 +465,7 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(6, count);
   }
 
@@ -465,7 +474,7 @@ public class RawDataAggregationOperatorTest {
    * 501 - 600
    */
   @Test
-  public void groupByTimeRawDataTest5() throws IllegalPathException {
+  public void groupByTimeRawDataTest5() throws Exception {
     int[][] result =
         new int[][] {
           {100, 100, 100, 100, 99, 0},
@@ -496,6 +505,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(
             aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -521,12 +531,13 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(6, count);
   }
 
   /** 0 - 259 260 - 299 `300 - 499 */
   @Test
-  public void groupByEventRawDataTest1() throws IllegalPathException {
+  public void groupByEventRawDataTest1() throws Exception {
     int[][] result =
         new int[][] {
           {0, 260, 300},
@@ -567,6 +578,7 @@ public class RawDataAggregationOperatorTest {
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -588,11 +600,12 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(3, count);
   }
 
   @Test
-  public void groupByEventRawDataTest2() throws IllegalPathException {
+  public void groupByEventRawDataTest2() throws Exception {
     int[][] result =
         new int[][] {
           {4019900, 613770, 11180, 827160, 7790, 1044950},
@@ -631,6 +644,7 @@ public class RawDataAggregationOperatorTest {
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -651,11 +665,12 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(6, count);
   }
 
   @Test
-  public void groupByEventRawDataTest3() throws IllegalPathException {
+  public void groupByEventRawDataTest3() throws Exception {
     int[][] result =
         new int[][] {
           {4019900, 613770, 11180, 827160, 7790, 1044950},
@@ -690,6 +705,7 @@ public class RawDataAggregationOperatorTest {
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -709,11 +725,12 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(6, count);
   }
   /** 0 - 199 200 - 259 260 - 299 300 - 379 380 - 399 400 - 499 */
   @Test
-  public void groupByEventRawDataTest4() throws IllegalPathException {
+  public void groupByEventRawDataTest4() throws Exception {
     int[] result =
         new int[] {
           20000, 10200, 260, 10300, 380, 10400,
@@ -739,6 +756,7 @@ public class RawDataAggregationOperatorTest {
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -753,25 +771,25 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(6, count);
   }
 
   @Test
-  public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
+  public void onePointInOneEqualEventWindowTest() throws Exception {
     WindowParameter windowParameter =
         new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0);
     onePointInOneWindowTest(windowParameter);
   }
 
   @Test
-  public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
+  public void onePointInOneVariationEventWindowTest() throws Exception {
     WindowParameter windowParameter =
         new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
     onePointInOneWindowTest(windowParameter);
   }
 
-  private void onePointInOneWindowTest(WindowParameter windowParameter)
-      throws IllegalPathException {
+  private void onePointInOneWindowTest(WindowParameter windowParameter) throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     List<List<InputLocation[]>> inputLocations = new ArrayList<>();
     for (int i = 0; i < 2; i++) {
@@ -791,6 +809,7 @@ public class RawDataAggregationOperatorTest {
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
 
     int resultMinTime1 = -1, resultMinTime2 = -1;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -815,12 +834,13 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(resultMinTime1, 499);
     assertEquals(resultMinTime2, 499);
   }
 
   @Test
-  public void groupBySessionRawDataTest1() throws IllegalPathException {
+  public void groupBySessionRawDataTest1() throws Exception {
     int[][] result = new int[][] {{0}, {499}, {20000}, {10499}};
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     List<List<InputLocation[]>> inputLocations = new ArrayList<>();
@@ -854,6 +874,7 @@ public class RawDataAggregationOperatorTest {
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
+
     while (rawDataAggregationOperator.isBlocked().isDone()
         && rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -875,6 +896,7 @@ public class RawDataAggregationOperatorTest {
         }
       }
     }
+
     assertEquals(1, count);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 36c815927a..06ea323617 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -91,7 +91,7 @@ public class SeriesAggregationScanOperatorTest {
   }
 
   @Test
-  public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+  public void testAggregationWithoutTimeFilter() throws Exception {
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
     List<Aggregator> aggregators = new ArrayList<>();
     AccumulatorFactory.createAccumulators(
@@ -104,16 +104,18 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws IllegalPathException {
+  public void testAggregationWithoutTimeFilterOrderByTimeDesc() throws Exception {
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
     List<Aggregator> aggregators = new ArrayList<>();
     AccumulatorFactory.createAccumulators(
@@ -126,16 +128,18 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, false, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilter1() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.COUNT);
     aggregationTypes.add(TAggregationType.SUM);
@@ -150,17 +154,19 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilter2() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -179,6 +185,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
@@ -189,12 +196,12 @@ public class SeriesAggregationScanOperatorTest {
       assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
-      throws IllegalPathException {
+  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -213,6 +220,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, false, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
@@ -223,11 +231,12 @@ public class SeriesAggregationScanOperatorTest {
       assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithTimeFilter1() throws IllegalPathException {
+  public void testAggregationWithTimeFilter1() throws Exception {
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
     List<Aggregator> aggregators = new ArrayList<>();
     AccumulatorFactory.createAccumulators(
@@ -241,16 +250,18 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithTimeFilter2() throws IllegalPathException {
+  public void testAggregationWithTimeFilter2() throws Exception {
     Filter timeFilter = TimeFilter.ltEq(379);
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
     List<Aggregator> aggregators = new ArrayList<>();
@@ -264,16 +275,18 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testAggregationWithTimeFilter3() throws IllegalPathException {
+  public void testAggregationWithTimeFilter3() throws Exception {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
     List<Aggregator> aggregators = new ArrayList<>();
@@ -287,16 +300,18 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+  public void testMultiAggregationWithTimeFilter() throws Exception {
     List<TAggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(TAggregationType.FIRST_VALUE);
     aggregationTypes.add(TAggregationType.LAST_VALUE);
@@ -316,6 +331,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
@@ -326,11 +342,12 @@ public class SeriesAggregationScanOperatorTest {
       assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
       count++;
     }
+
     assertEquals(1, count);
   }
 
   @Test
-  public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+  public void testGroupByWithoutGlobalTimeFilter() throws Exception {
     int[] result = new int[] {100, 100, 100, 99};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -345,6 +362,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -354,11 +372,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(4, count);
   }
 
   @Test
-  public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+  public void testGroupByWithGlobalTimeFilter() throws Exception {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
@@ -374,6 +393,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, timeFilter, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -383,11 +403,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(4, count);
   }
 
   @Test
-  public void testGroupByWithMultiFunction() throws IllegalPathException {
+  public void testGroupByWithMultiFunction() throws Exception {
     int[][] result =
         new int[][] {
           {20000, 20100, 10200, 10300},
@@ -412,6 +433,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -424,11 +446,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(4, count);
   }
 
   @Test
-  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws Exception {
     int[][] result =
         new int[][] {
           {20000, 20100, 10200, 10300},
@@ -453,6 +476,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, false, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -465,11 +489,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(4, count);
   }
 
   @Test
-  public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+  public void testGroupBySlidingTimeWindow() throws Exception {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 49};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
     List<TAggregationType> aggregationTypes = Collections.singletonList(TAggregationType.COUNT);
@@ -484,6 +509,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -493,11 +519,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(result.length, count);
   }
 
   @Test
-  public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+  public void testGroupBySlidingTimeWindow2() throws Exception {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
@@ -513,6 +540,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -522,11 +550,12 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(timeColumn.length, count);
   }
 
   @Test
-  public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+  public void testGroupBySlidingWindowWithMultiFunction() throws Exception {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[][] result =
         new int[][] {
@@ -552,6 +581,7 @@ public class SeriesAggregationScanOperatorTest {
     SeriesAggregationScanOperator seriesAggregationScanOperator =
         initSeriesAggregationScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
+
     while (seriesAggregationScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregationScanOperator.next();
       int positionCount = resultTsBlock.getPositionCount();
@@ -564,6 +594,7 @@ public class SeriesAggregationScanOperatorTest {
         count++;
       }
     }
+
     assertEquals(timeColumn.length, count);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
index d79cb7fad9..2c0721684f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesScanOperatorTest.java
@@ -80,7 +80,7 @@ public class SeriesScanOperatorTest {
   }
 
   @Test
-  public void batchTest() {
+  public void batchTest() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
index b935d1d665..cd1a9a48a7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -194,7 +193,7 @@ public class SingleDeviceViewOperatorTest {
         count++;
       }
       assertEquals(500, total);
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
index 50d2ab026f..105f468a59 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SlidingWindowAggregationOperatorTest.java
@@ -133,7 +133,7 @@ public class SlidingWindowAggregationOperatorTest {
   }
 
   @Test
-  public void slidingWindowAggregationTest() throws IllegalPathException {
+  public void slidingWindowAggregationTest() throws Exception {
     String[] retArray =
         new String[] {
           "0,100,20049.5,2004950.0,20099,0,99,20000,20099,20000",
@@ -175,6 +175,7 @@ public class SlidingWindowAggregationOperatorTest {
         count--;
       }
     }
+
     Assert.assertEquals(0, count);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 70c768c1cf..ef122f78b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -87,7 +87,7 @@ public class TimeJoinOperatorTest {
   }
 
   @Test
-  public void batchTest1() {
+  public void batchTest1() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -182,7 +182,7 @@ public class TimeJoinOperatorTest {
 
   /** test time join with non-exist sensor */
   @Test
-  public void batchTest2() {
+  public void batchTest2() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -298,7 +298,7 @@ public class TimeJoinOperatorTest {
 
   /** test time join with non-exist sensor and order by time desc */
   @Test
-  public void batchTest3() {
+  public void batchTest3() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 953da1b480..a9fc842802 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -113,7 +113,7 @@ public class UpdateLastCacheOperatorTest {
       assertFalse(updateLastCacheOperator.hasNext());
       assertTrue(updateLastCacheOperator.isFinished());
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
@@ -143,7 +143,7 @@ public class UpdateLastCacheOperatorTest {
       assertFalse(updateLastCacheOperator.hasNext());
       assertTrue(updateLastCacheOperator.isFinished());
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
@@ -173,7 +173,7 @@ public class UpdateLastCacheOperatorTest {
       assertFalse(updateLastCacheOperator.hasNext());
       assertTrue(updateLastCacheOperator.isFinished());
 
-    } catch (IllegalPathException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
index ec10473b9b..5bf7e1a656 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperatorTest.java
@@ -55,7 +55,7 @@ public class SchemaCountOperatorTest {
   private static final String SCHEMA_COUNT_OPERATOR_TEST_SG = "root.SchemaCountOperatorTest";
 
   @Test
-  public void testSchemaCountOperator() {
+  public void testSchemaCountOperator() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -184,7 +184,7 @@ public class SchemaCountOperatorTest {
     }
   }
 
-  private List<TsBlock> collectResult(CountGroupByLevelScanOperator<?> operator) {
+  private List<TsBlock> collectResult(CountGroupByLevelScanOperator<?> operator) throws Exception {
     List<TsBlock> tsBlocks = new ArrayList<>();
     while (operator.hasNext()) {
       TsBlock tsBlock = operator.next();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
index 434d509c75..7adb9b07bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperatorTest.java
@@ -64,7 +64,7 @@ public class SchemaQueryScanOperatorTest {
   private static final String META_SCAN_OPERATOR_TEST_SG = "root.MetaScanOperatorTest";
 
   @Test
-  public void testDeviceSchemaScan() {
+  public void testDeviceSchemaScan() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
@@ -156,7 +156,7 @@ public class SchemaQueryScanOperatorTest {
   }
 
   @Test
-  public void testTimeSeriesSchemaScan() {
+  public void testTimeSeriesSchemaScan() throws Exception {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
     try {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index bb1e0bfb4b..671a24a39a 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -63,7 +63,7 @@ struct TGetDataBlockRequest {
   1: required TFragmentInstanceId sourceFragmentInstanceId
   2: required i32 startSequenceId
   3: required i32 endSequenceId
-  // index of upstream SinkHandle
+  // index of upstream SinkChannel
   4: required i32 index
 }
 
@@ -75,10 +75,16 @@ struct TAcknowledgeDataBlockEvent {
   1: required TFragmentInstanceId sourceFragmentInstanceId
   2: required i32 startSequenceId
   3: required i32 endSequenceId
-  // index of upstream SinkHandle
+  // index of upstream SinkChannel
   4: required i32 index
 }
 
+struct TCloseSinkChannelEvent {
+  1: required TFragmentInstanceId sourceFragmentInstanceId
+  // index of upstream SinkChannel
+  2: required i32 index
+}
+
 struct TNewDataBlockEvent {
   1: required TFragmentInstanceId targetFragmentInstanceId
   2: required string targetPlanNodeId
@@ -766,6 +772,8 @@ service MPPDataExchangeService {
 
   void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e);
 
+  void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
+
   void onNewDataBlockEvent(TNewDataBlockEvent e);
 
   void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);