You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/28 10:35:22 UTC

[iotdb] branch master updated: [IOTDB-3021] Fix sink/source handle memory leak (#5692)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c1abdd894 [IOTDB-3021] Fix sink/source handle memory leak (#5692)
4c1abdd894 is described below

commit 4c1abdd894911d9790808105c1e0a5bd194ff88f
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Thu Apr 28 18:35:16 2022 +0800

    [IOTDB-3021] Fix sink/source handle memory leak (#5692)
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  14 +--
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |  19 ++--
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  |  13 +--
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 104 ++++++++-----------
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 112 ++++++++++-----------
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |   2 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |   4 +-
 .../org/apache/iotdb/db/mpp/memory/MemoryPool.java |  25 ++++-
 .../db/mpp/operator/source/ExchangeOperator.java   |   2 +-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  69 ++++++-------
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  76 ++++++--------
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  20 ++--
 .../apache/iotdb/db/mpp/memory/MemoryPoolTest.java |  25 +++++
 13 files changed, 240 insertions(+), 245 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index e4d38df2f1..20b7be00bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -54,7 +54,7 @@ public class DataBlockManager implements IDataBlockManager {
   public interface SourceHandleListener {
     void onFinished(ISourceHandle sourceHandle);
 
-    void onClosed(ISourceHandle sourceHandle);
+    void onAborted(ISourceHandle sourceHandle);
 
     void onFailure(ISourceHandle sourceHandle, Throwable t);
   }
@@ -62,7 +62,7 @@ public class DataBlockManager implements IDataBlockManager {
   public interface SinkHandleListener {
     void onFinish(ISinkHandle sinkHandle);
 
-    void onClosed(ISinkHandle sinkHandle);
+    void onEndOfBlocks(ISinkHandle sinkHandle);
 
     void onAborted(ISinkHandle sinkHandle);
 
@@ -130,7 +130,7 @@ public class DataBlockManager implements IDataBlockManager {
           || sourceHandles
               .get(e.getTargetFragmentInstanceId())
               .get(e.getTargetPlanNodeId())
-              .isClosed()) {
+              .isAborted()) {
         throw new TException(
             "Target fragment instance not found. Fragment instance ID: "
                 + e.getTargetFragmentInstanceId()
@@ -156,7 +156,7 @@ public class DataBlockManager implements IDataBlockManager {
           || sourceHandles
               .get(e.getTargetFragmentInstanceId())
               .get(e.getTargetPlanNodeId())
-              .isClosed()) {
+              .isAborted()) {
         throw new TException(
             "Target fragment instance not found. Fragment instance ID: "
                 + e.getTargetFragmentInstanceId()
@@ -200,7 +200,7 @@ public class DataBlockManager implements IDataBlockManager {
     }
 
     @Override
-    public void onClosed(ISourceHandle sourceHandle) {
+    public void onAborted(ISourceHandle sourceHandle) {
       onFinished(sourceHandle);
     }
 
@@ -236,7 +236,7 @@ public class DataBlockManager implements IDataBlockManager {
     }
 
     @Override
-    public void onClosed(ISinkHandle sinkHandle) {
+    public void onEndOfBlocks(ISinkHandle sinkHandle) {
       context.transitionToFlushing();
     }
 
@@ -379,7 +379,7 @@ public class DataBlockManager implements IDataBlockManager {
       Map<String, SourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
       for (Entry<String, SourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
         logger.info("Close source handle {}", sourceHandles);
-        entry.getValue().close();
+        entry.getValue().abort();
       }
       sourceHandles.remove(fragmentInstanceId);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index b28b812175..da42a93dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -51,27 +51,20 @@ public interface ISinkHandle {
   void send(int partition, List<TsBlock> tsBlocks);
 
   /**
-   * Notify the handle that no more tsblocks will be sent. Any future calls to send a tsblock should
-   * be ignored.
+   * Notify the handle that there are no more tsblocks. Any future calls to send a tsblock should be
+   * ignored.
    */
   void setNoMoreTsBlocks();
 
-  /** If the handle is closed. */
-  boolean isClosed();
+  /** If the handle is aborted. */
+  boolean isAborted();
 
   /**
-   * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
-   * instances.
+   * If there are no more tsblocks to be sent and all the tsblocks have been fetched by downstream
+   * fragment instances.
    */
   boolean isFinished();
 
-  /**
-   * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
-   * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
-   * during the data transmission.
-   */
-  void close();
-
   /**
    * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
    * the future returned by {@link #isFull()}.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index e9be12d838..1df8f3d18b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.Closeable;
-
-public interface ISourceHandle extends Closeable {
+public interface ISourceHandle {
 
   /** Get the local fragment instance ID that this source handle belongs to. */
   TFragmentInstanceId getLocalFragmentInstanceId();
@@ -48,13 +46,12 @@ public interface ISourceHandle extends Closeable {
   /** Get a future that will be completed when the input buffer is not empty. */
   ListenableFuture<Void> isBlocked();
 
-  /** If this handle is closed. */
-  boolean isClosed();
+  /** If this handle is aborted. */
+  boolean isAborted();
 
   /**
-   * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+   * Abort the handle. Discard all tsblocks which may still be in the memory buffer and complete the
    * future returned by {@link #isBlocked()}.
    */
-  @Override
-  void close();
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index eea8850869..63c6d89de6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -74,8 +74,10 @@ public class SinkHandle implements ISinkHandle {
 
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private int nextSequenceId = 0;
+  /** The actual buffered memory in bytes, including the amount of memory being reserved. */
   private long bufferRetainedSizeInBytes = 0;
-  private boolean closed = false;
+
+  private boolean aborted = false;
   private boolean noMoreTsBlocks = false;
 
   public SinkHandle(
@@ -100,9 +102,9 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public ListenableFuture<Void> isFull() {
-    if (closed) {
-      throw new IllegalStateException("Sink handle is closed.");
+  public synchronized ListenableFuture<Void> isFull() {
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
     }
     return nonCancellationPropagating(blocked);
   }
@@ -112,10 +114,10 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) {
+  public synchronized void send(List<TsBlock> tsBlocks) {
     Validate.notNull(tsBlocks, "tsBlocks is null");
-    if (closed) {
-      throw new IllegalStateException("Sink handle is closed.");
+    if (aborted) {
+      throw new IllegalStateException("Sink handle is aborted.");
     }
     if (!blocked.isDone()) {
       throw new IllegalStateException("Sink handle is blocked.");
@@ -123,27 +125,24 @@ public class SinkHandle implements ISinkHandle {
     if (noMoreTsBlocks) {
       return;
     }
-
     long retainedSizeInBytes = 0L;
     for (TsBlock tsBlock : tsBlocks) {
       retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
     }
     int startSequenceId;
     List<Long> tsBlockSizes = new ArrayList<>();
-    synchronized (this) {
-      startSequenceId = nextSequenceId;
-      blocked =
-          localMemoryManager
-              .getQueryPool()
-              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
-      bufferRetainedSizeInBytes += retainedSizeInBytes;
-      for (TsBlock tsBlock : tsBlocks) {
-        sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
-        nextSequenceId += 1;
-      }
-      for (int i = startSequenceId; i < nextSequenceId; i++) {
-        tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
-      }
+    startSequenceId = nextSequenceId;
+    blocked =
+        localMemoryManager
+            .getQueryPool()
+            .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+    bufferRetainedSizeInBytes += retainedSizeInBytes;
+    for (TsBlock tsBlock : tsBlocks) {
+      sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
+      nextSequenceId += 1;
+    }
+    for (int i = startSequenceId; i < nextSequenceId; i++) {
+      tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
     }
 
     // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
@@ -151,7 +150,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(int partition, List<TsBlock> tsBlocks) {
+  public synchronized void send(int partition, List<TsBlock> tsBlocks) {
     throw new UnsupportedOperationException();
   }
 
@@ -210,9 +209,9 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void close() {
-    logger.info("Sink handle {} is being closed.", this);
-    if (closed) {
+  public synchronized void setNoMoreTsBlocks() {
+    logger.info("Setting no-more-tsblocks to {}", this);
+    if (aborted) {
       return;
     }
     try {
@@ -220,51 +219,33 @@ public class SinkHandle implements ISinkHandle {
     } catch (Exception e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
-    synchronized (this) {
-      closed = true;
-      // synchronized is reentrant lock, wo we can invoke setNoMoreTsBlocks() here.
-      setNoMoreTsBlocks();
+    noMoreTsBlocks = true;
+    if (isFinished()) {
+      sinkHandleListener.onFinish(this);
     }
-    sinkHandleListener.onClosed(this);
-    logger.info("Sink handle {} is closed.", this);
+    sinkHandleListener.onEndOfBlocks(this);
+    logger.info("No more tsblocks has been set to {}", this);
   }
 
   @Override
-  public void abort() {
+  public synchronized void abort() {
     logger.info("Sink handle {} is being aborted.", this);
-    synchronized (this) {
-      sequenceIdToTsBlock.clear();
-      closed = true;
-      if (blocked != null && !blocked.isDone()) {
-        blocked.cancel(true);
-      }
-      if (bufferRetainedSizeInBytes > 0) {
-        localMemoryManager
-            .getQueryPool()
-            .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
-        bufferRetainedSizeInBytes = 0;
-      }
+    sequenceIdToTsBlock.clear();
+    aborted = true;
+    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+    if (bufferRetainedSizeInBytes > 0) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
     }
     sinkHandleListener.onAborted(this);
     logger.info("Sink handle {} is aborted", this);
   }
 
   @Override
-  public synchronized void setNoMoreTsBlocks() {
-    noMoreTsBlocks = true;
-    // In current implementation, the onFinish() is only invoked when receiving the
-    // acknowledge event from SourceHandle. If the acknowledge event happens before
-    // the close(), the onFinish() won't be invoked and the instance's status will
-    // always be FLUSHING. We cannot ensure the sequence of `acknowledge event` and
-    // `close` so we need to do following check every time `noMoreTsBlocks` is updated.
-    if (isFinished()) {
-      sinkHandleListener.onFinish(this);
-    }
-  }
-
-  @Override
-  public boolean isClosed() {
-    return closed;
+  public boolean isAborted() {
+    return aborted;
   }
 
   @Override
@@ -297,6 +278,9 @@ public class SinkHandle implements ISinkHandle {
   void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
     long freedBytes = 0L;
     synchronized (this) {
+      if (aborted) {
+        return;
+      }
       Iterator<Entry<Integer, TsBlock>> iterator = sequenceIdToTsBlock.entrySet().iterator();
       while (iterator.hasNext()) {
         Entry<Integer, TsBlock> entry = iterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 1ebb694b76..6037ab8779 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -69,12 +69,17 @@ public class SourceHandle implements ISourceHandle {
   private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
       dataBlockServiceClientManager;
 
-  private volatile SettableFuture<Void> blocked = SettableFuture.create();
+  private SettableFuture<Void> blocked = SettableFuture.create();
+
+  private ListenableFuture<Void> blockedOnMemory;
+
+  /** The actual buffered memory in bytes, including the amount of memory being reserved. */
   private long bufferRetainedSizeInBytes = 0L;
+
   private int currSequenceId = 0;
   private int nextSequenceId = 0;
   private int lastSequenceId = Integer.MAX_VALUE;
-  private boolean closed = false;
+  private boolean aborted = false;
 
   public SourceHandle(
       TEndPoint remoteEndpoint,
@@ -99,25 +104,24 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public TsBlock receive() {
-    if (closed) {
-      throw new IllegalStateException("Source handle is closed.");
+  public synchronized TsBlock receive() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
     }
     if (!blocked.isDone()) {
       throw new IllegalStateException("Source handle is blocked.");
     }
-    TsBlock tsBlock;
-    synchronized (this) {
-      tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
-      currSequenceId += 1;
-      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
 
-      if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
-        blocked = SettableFuture.create();
-      }
+    TsBlock tsBlock;
+    tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
+    currSequenceId += 1;
+    bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+    localMemoryManager
+        .getQueryPool()
+        .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+
+    if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
+      blocked = SettableFuture.create();
     }
     if (isFinished()) {
       sourceHandleListener.onFinished(this);
@@ -127,10 +131,17 @@ public class SourceHandle implements ISourceHandle {
   }
 
   private synchronized void trySubmitGetDataBlocksTask() {
+    if (aborted) {
+      return;
+    }
+    if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
+      return;
+    }
+
     final int startSequenceId = nextSequenceId;
     int endSequenceId = nextSequenceId;
     long reservedBytes = 0L;
-    ListenableFuture<?> future = null;
+    ListenableFuture<Void> future = null;
     while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
       Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
       if (bytesToReserve == null) {
@@ -140,11 +151,10 @@ public class SourceHandle implements ISourceHandle {
           localMemoryManager
               .getQueryPool()
               .reserve(localFragmentInstanceId.getQueryId(), bytesToReserve);
-      if (future.isDone()) {
-        endSequenceId += 1;
-        reservedBytes += bytesToReserve;
-        bufferRetainedSizeInBytes += bytesToReserve;
-      } else {
+      bufferRetainedSizeInBytes += bytesToReserve;
+      endSequenceId += 1;
+      reservedBytes += bytesToReserve;
+      if (!future.isDone()) {
         break;
       }
     }
@@ -154,48 +164,27 @@ public class SourceHandle implements ISourceHandle {
       return;
     }
 
-    if (future.isDone()) {
-      nextSequenceId = endSequenceId;
-      executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
-    } else {
-      nextSequenceId = endSequenceId + 1;
+    nextSequenceId = endSequenceId;
+    executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
+    if (!future.isDone()) {
+      blockedOnMemory = future;
       // The future being not completed indicates,
       //   1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
-      //   2. Memory reservation for block whose sequence ID equals endSequenceId is blocked.
+      //   2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
       //   3. Have not reserve memory for the rest of blocks.
       //
-      //  startSequenceId             endSequenceId  endSequenceId + 1
+      //  startSequenceId          endSequenceId - 1  endSequenceId
       //         |-------- reserved --------|--- blocked ---|--- not reserved ---|
 
-      if (endSequenceId > startSequenceId) {
-        // Memory has been reserved. Submit a GetDataBlocksTask for these blocks.
-        executorService.submit(
-            new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
-      }
-
-      // Submit a GetDataBlocksTask when memory is freed.
-      final int sequenceIdOfUnReservedDataBlock = endSequenceId;
-      final long sizeOfUnReservedDataBlock = sequenceIdToDataBlockSize.get(endSequenceId);
-      future.addListener(
-          () -> {
-            executorService.submit(
-                new GetDataBlocksTask(
-                    sequenceIdOfUnReservedDataBlock,
-                    sequenceIdOfUnReservedDataBlock + 1,
-                    sizeOfUnReservedDataBlock));
-            bufferRetainedSizeInBytes += sizeOfUnReservedDataBlock;
-          },
-          executorService);
-
       // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
       future.addListener(SourceHandle.this::trySubmitGetDataBlocksTask, executorService);
     }
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
-    if (closed) {
-      throw new IllegalStateException("Source handle is closed.");
+  public synchronized ListenableFuture<Void> isBlocked() {
+    if (aborted) {
+      throw new IllegalStateException("Source handle is aborted.");
     }
     return nonCancellationPropagating(blocked);
   }
@@ -205,6 +194,9 @@ public class SourceHandle implements ISourceHandle {
     if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
       blocked.set(null);
     }
+    if (isFinished()) {
+      sourceHandleListener.onFinished(this);
+    }
   }
 
   synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -215,13 +207,14 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public synchronized void close() {
-    if (closed) {
+  public synchronized void abort() {
+    if (aborted) {
       return;
     }
     if (blocked != null && !blocked.isDone()) {
       blocked.cancel(true);
     }
+    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
     sequenceIdToDataBlockSize.clear();
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
@@ -229,8 +222,8 @@ public class SourceHandle implements ISourceHandle {
           .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    closed = true;
-    sourceHandleListener.onClosed(this);
+    aborted = true;
+    sourceHandleListener.onAborted(this);
   }
 
   @Override
@@ -267,8 +260,8 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public boolean isClosed() {
-    return closed;
+  public boolean isAborted() {
+    return aborted;
   }
 
   @Override
@@ -335,7 +328,7 @@ public class SourceHandle implements ISourceHandle {
             tsBlocks.add(tsBlock);
           }
           synchronized (SourceHandle.this) {
-            if (closed) {
+            if (aborted) {
               return;
             }
             for (int i = startSequenceId; i < endSequenceId; i++) {
@@ -372,7 +365,6 @@ public class SourceHandle implements ISourceHandle {
           }
         }
       }
-      // TODO: try to issue another GetDataBlocksTask to make the query run faster.
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index ae220e4911..41da1197a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -335,7 +335,7 @@ public abstract class Driver implements IDriver {
 
     try {
       root.close();
-      sinkHandle.close();
+      sinkHandle.setNoMoreTsBlocks();
     } catch (InterruptedException t) {
       // don't record the stack
       wasInterrupted = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9a254de2d7..cc5565c0eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -206,7 +206,7 @@ public class QueryExecution implements IQueryExecution {
     //   1. The client fetch all the result and the ResultHandle is finished.
     //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
     if (resultHandle != null && resultHandle.isFinished()) {
-      resultHandle.close();
+      resultHandle.abort();
     }
   }
 
@@ -220,7 +220,7 @@ public class QueryExecution implements IQueryExecution {
   @Override
   public TsBlock getBatchResult() {
     try {
-      if (resultHandle.isClosed() || resultHandle.isFinished()) {
+      if (resultHandle.isAborted() || resultHandle.isFinished()) {
         return null;
       }
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
index f634b71a66..14950ed94c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
@@ -35,7 +35,7 @@ import java.util.Queue;
 /** A thread-safe memory pool. */
 public class MemoryPool {
 
-  private static class MemoryReservationFuture<V> extends AbstractFuture<V> {
+  public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
     private final String queryId;
     private final long bytes;
 
@@ -133,6 +133,26 @@ public class MemoryPool {
     return true;
   }
 
+  /**
+   * Cancel the specified memory reservation. If the reservation has finished, do nothing.
+   *
+   * @param future The future returned from {@link #reserve(String, long)}
+   * @return If the future has not complete, return the number of bytes being reserved. Otherwise,
+   *     return 0.
+   */
+  public synchronized long tryCancel(ListenableFuture<Void> future) {
+    if (future.isDone()) {
+      return 0L;
+    }
+
+    Validate.notNull(future);
+    Validate.isTrue(
+        future instanceof MemoryReservationFuture,
+        "invalid future type " + future.getClass().getSimpleName());
+    future.cancel(true);
+    return ((MemoryReservationFuture<Void>) future).getBytes();
+  }
+
   public synchronized void free(String queryId, long bytes) {
     Validate.notNull(queryId);
     Validate.isTrue(bytes > 0L);
@@ -155,12 +175,9 @@ public class MemoryPool {
     Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
     while (iterator.hasNext()) {
       MemoryReservationFuture<Void> future = iterator.next();
-
       if (future.isCancelled()) {
-        iterator.remove();
         continue;
       }
-
       long bytesToReserve = future.getBytes();
       if (maxBytes - reservedBytes < bytesToReserve) {
         return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 26d4250acc..9a0b115e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -81,6 +81,6 @@ public class ExchangeOperator implements SourceOperator {
 
   @Override
   public void close() throws Exception {
-    sourceHandle.close();
+    sourceHandle.abort();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 8ac22daac7..376dff6dcb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -95,16 +95,15 @@ public class SinkHandleTest {
             mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
     sinkHandle.send(mockTsBlocks);
-    sinkHandle.setNoMoreTsBlocks();
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
         mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -138,20 +137,23 @@ public class SinkHandleTest {
     }
     Assert.assertFalse(sinkHandle.isFinished());
 
+    // Set no-more-tsblocks.
+    sinkHandle.setNoMoreTsBlocks();
+    Assert.assertTrue(sinkHandle.isFull().isDone());
+    Assert.assertFalse(sinkHandle.isFinished());
+    Assert.assertFalse(sinkHandle.isAborted());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(sinkHandle);
+
     // Ack tsblocks.
     sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertTrue(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
 
-    // Close the SinkHandle.
-    sinkHandle.close();
-    Assert.assertTrue(sinkHandle.isClosed());
-    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
     try {
       Thread.sleep(100L);
       Mockito.verify(mockClient, Mockito.times(1))
@@ -222,7 +224,7 @@ public class SinkHandleTest {
             mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
@@ -230,7 +232,7 @@ public class SinkHandleTest {
     sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
         mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -268,7 +270,7 @@ public class SinkHandleTest {
     sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
@@ -277,7 +279,7 @@ public class SinkHandleTest {
     sinkHandle.send(mockTsBlocks);
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
         mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -299,12 +301,11 @@ public class SinkHandleTest {
       Assert.fail();
     }
 
-    // Close the SinkHandle.
+    // Set no-more-tsblocks.
     sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFinished());
-    sinkHandle.close();
-    Assert.assertTrue(sinkHandle.isClosed());
-    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
+    Assert.assertFalse(sinkHandle.isAborted());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(sinkHandle);
     try {
       Thread.sleep(100L);
       Mockito.verify(mockClient, Mockito.times(1))
@@ -320,7 +321,7 @@ public class SinkHandleTest {
       Assert.fail();
     }
 
-    // Get tsblocks after the SinkHandle is closed.
+    // Get tsblocks after no-more-tsblocks is set.
     for (int i = numOfMockTsBlock; i < numOfMockTsBlock * 2; i++) {
       try {
         sinkHandle.getSerializedTsBlock(i);
@@ -334,7 +335,7 @@ public class SinkHandleTest {
     // Ack tsblocks.
     sinkHandle.acknowledgeTsBlock(numOfMockTsBlock, numOfMockTsBlock * 2);
     Assert.assertTrue(sinkHandle.isFinished());
-    Assert.assertTrue(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(2))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
@@ -395,16 +396,15 @@ public class SinkHandleTest {
             mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
     sinkHandle.send(mockTsBlocks);
-    sinkHandle.setNoMoreTsBlocks();
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
         mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -425,22 +425,21 @@ public class SinkHandleTest {
       e.printStackTrace();
       Assert.fail();
     }
-
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(sinkHandle, mockException);
 
     // Close the SinkHandle.
     try {
-      sinkHandle.close();
+      sinkHandle.setNoMoreTsBlocks();
       Assert.fail("Expect an RuntimeException.");
     } catch (RuntimeException e) {
       Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
     }
-    Assert.assertFalse(sinkHandle.isClosed());
-    Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
+    Assert.assertFalse(sinkHandle.isAborted());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onEndOfBlocks(sinkHandle);
 
     // Abort the SinkHandle.
     sinkHandle.abort();
-    Assert.assertTrue(sinkHandle.isClosed());
+    Assert.assertTrue(sinkHandle.isAborted());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
     Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onFinish(sinkHandle);
   }
@@ -459,9 +458,9 @@ public class SinkHandleTest {
 
     // Construct a mock LocalMemoryManager that returns blocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
-    MemoryPool mockMemoryPool =
-        Utils.createMockBlockedMemoryPool(queryId, numOfMockTsBlock, mockTsBlockSize);
-    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    MemoryPool spyMemoryPool =
+        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 10 * mockTsBlockSize));
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
 
     // Construct a mock SinkHandleListener.
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
@@ -499,28 +498,30 @@ public class SinkHandleTest {
             mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
     sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks);
     Future<?> blocked = sinkHandle.isFull();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
-    Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+        2 * mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(2 * numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
 
     sinkHandle.abort();
     Assert.assertTrue(blocked.isDone());
     Assert.assertTrue(blocked.isCancelled());
     Assert.assertFalse(sinkHandle.isFinished());
-    Assert.assertTrue(sinkHandle.isClosed());
+    Assert.assertTrue(sinkHandle.isAborted());
     Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
+    Assert.assertEquals(0L, spyMemoryPool.getQueryMemoryReservedBytes(queryId));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 87d76463db..c8c6c00329 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -103,7 +103,7 @@ public class SourceHandleTest {
             mockSourceHandleListener,
             mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
@@ -134,22 +134,20 @@ public class SourceHandleTest {
       Assert.fail();
     }
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
         numOfMockTsBlock * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
-
       sourceHandle.receive();
-
       if (i < numOfMockTsBlock - 1) {
         Assert.assertTrue(sourceHandle.isBlocked().isDone());
       } else {
         Assert.assertFalse(sourceHandle.isBlocked().isDone());
       }
-      Assert.assertFalse(sourceHandle.isClosed());
+      Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
           (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -159,14 +157,10 @@ public class SourceHandleTest {
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
-    Assert.assertTrue(sourceHandle.isFinished());
-    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
-    sourceHandle.close();
-    Assert.assertTrue(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
   }
 
   @Test
@@ -226,7 +220,7 @@ public class SourceHandleTest {
             mockSourceHandleListener,
             mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
@@ -245,22 +239,22 @@ public class SourceHandleTest {
                   req ->
                       remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
                           && 0 == req.getStartSequenceId()
-                          && 5 == req.getEndSequenceId()));
+                          && 6 == req.getEndSequenceId()));
       Mockito.verify(mockClient, Mockito.times(1))
           .onAcknowledgeDataBlockEvent(
               Mockito.argThat(
                   e ->
                       remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
                           && 0 == e.getStartSequenceId()
-                          && 5 == e.getEndSequenceId()));
+                          && 6 == e.getEndSequenceId()));
     } catch (InterruptedException | TException e) {
       e.printStackTrace();
       Assert.fail();
     }
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
-    Assert.assertEquals(5 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(6 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -268,9 +262,9 @@ public class SourceHandleTest {
       sourceHandle.receive();
       try {
         Thread.sleep(100L);
-        if (i < 5) {
-          Assert.assertEquals(5 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
-          final int startSequenceId = 5 + i;
+        if (i < 4) {
+          Assert.assertEquals(6 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
+          final int startSequenceId = 6 + i;
           Mockito.verify(mockClient, Mockito.times(1))
               .getDataBlock(
                   Mockito.argThat(
@@ -299,20 +293,17 @@ public class SourceHandleTest {
       } else {
         Assert.assertFalse(sourceHandle.isBlocked().isDone());
       }
-      Assert.assertFalse(sourceHandle.isClosed());
+      Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
     }
 
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
-    Assert.assertTrue(sourceHandle.isFinished());
-    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
-    sourceHandle.close();
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
   }
 
   @Test
@@ -371,7 +362,7 @@ public class SourceHandleTest {
             mockSourceHandleListener,
             mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
@@ -392,7 +383,7 @@ public class SourceHandleTest {
       Assert.fail();
     }
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
@@ -422,7 +413,7 @@ public class SourceHandleTest {
       Assert.fail();
     }
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
         numOfMockTsBlock * 2 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
@@ -435,7 +426,7 @@ public class SourceHandleTest {
       } else {
         Assert.assertFalse(sourceHandle.isBlocked().isDone());
       }
-      Assert.assertFalse(sourceHandle.isClosed());
+      Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
           (2 * numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -469,7 +460,7 @@ public class SourceHandleTest {
       Assert.fail();
     }
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
         numOfMockTsBlock * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
@@ -482,7 +473,7 @@ public class SourceHandleTest {
       } else {
         Assert.assertFalse(sourceHandle.isBlocked().isDone());
       }
-      Assert.assertFalse(sourceHandle.isClosed());
+      Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
           (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -492,14 +483,10 @@ public class SourceHandleTest {
     // Receive EndOfDataBlock event from upstream fragment instance.
     sourceHandle.setNoMoreTsBlocks(3 * numOfMockTsBlock - 1);
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
-    Assert.assertTrue(sourceHandle.isFinished());
-    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
-    sourceHandle.close();
-    Assert.assertTrue(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertTrue(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
   }
 
   @Test
@@ -551,11 +538,10 @@ public class SourceHandleTest {
             mockClientManager);
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
-    // Receive data blocks from upstream fragment instance.
     // New data blocks event arrived.
     sourceHandle.updatePendingDataBlockInfo(
         0,
@@ -575,10 +561,12 @@ public class SourceHandleTest {
         .onFailure(sourceHandle, mockException);
     Assert.assertFalse(blocked.isDone());
 
-    sourceHandle.close();
+    sourceHandle.abort();
     Assert.assertFalse(sourceHandle.isFinished());
+    Assert.assertTrue(sourceHandle.isAborted());
     Assert.assertTrue(blocked.isDone());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(sourceHandle);
   }
 
   @Test
@@ -638,16 +626,16 @@ public class SourceHandleTest {
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
-    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
-    sourceHandle.close();
+    sourceHandle.abort();
     Assert.assertTrue(blocked.isDone());
     Assert.assertTrue(blocked.isCancelled());
-    Assert.assertTrue(sourceHandle.isClosed());
+    Assert.assertTrue(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onClosed(sourceHandle);
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(sourceHandle);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index ebd863d738..a98152ba24 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -69,10 +69,16 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void setNoMoreTsBlocks() {}
+  public void setNoMoreTsBlocks() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    instanceContext.transitionToFlushing();
+  }
 
   @Override
-  public boolean isClosed() {
+  public boolean isAborted() {
     return closed;
   }
 
@@ -81,17 +87,9 @@ public class StubSinkHandle implements ISinkHandle {
     return false;
   }
 
-  @Override
-  public void close() {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    instanceContext.transitionToFlushing();
-  }
-
   @Override
   public void abort() {
+    closed = true;
     tsBlocks.clear();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
index c1ca224868..fd4ed074d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
@@ -247,4 +247,29 @@ public class MemoryPoolTest {
     } catch (IllegalArgumentException ignore) {
     }
   }
+
+  @Test
+  public void testTryCancelBlockedReservation() {
+    String queryId = "q0";
+    // Run out of memory.
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+
+    ListenableFuture<Void> f = pool.reserve(queryId, 256L);
+    Assert.assertFalse(f.isDone());
+    // Cancel the reservation.
+    Assert.assertEquals(256L, pool.tryCancel(f));
+    Assert.assertTrue(f.isDone());
+    Assert.assertTrue(f.isCancelled());
+  }
+
+  @Test
+  public void testTryCancelCompletedReservation() {
+    String queryId = "q0";
+    ListenableFuture<Void> f = pool.reserve(queryId, 256L);
+    Assert.assertTrue(f.isDone());
+    // Cancel the reservation.
+    Assert.assertEquals(0L, pool.tryCancel(f));
+    Assert.assertTrue(f.isDone());
+    Assert.assertFalse(f.isCancelled());
+  }
 }