You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/04 01:17:46 UTC

[iotdb] branch master updated: [IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef5936d56 [IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)
5ef5936d56 is described below

commit 5ef5936d564bb401764e72001334c24681b85983
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Aug 4 09:17:40 2022 +0800

    [IOTDB-4017] Fix Error happened while calling onAcknowledgeDataBlockEvent (#6869)
---
 .../iotdb/db/mpp/execution/driver/Driver.java      |  3 +-
 .../db/mpp/execution/exchange/ISinkHandle.java     |  2 +-
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 10 ++--
 .../db/mpp/execution/exchange/SinkHandle.java      | 52 +++++++++-------
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  |  2 +-
 .../execution/exchange/LocalSinkHandleTest.java    |  6 +-
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 70 +++++++++++++---------
 .../db/mpp/execution/exchange/StubSinkHandle.java  |  4 +-
 .../iotdb/db/mpp/execution/exchange/Utils.java     | 11 +++-
 9 files changed, 93 insertions(+), 67 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index acf8e89470..3462b0fc7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -186,7 +185,7 @@ public abstract class Driver implements IDriver {
       if (root.hasNext()) {
         TsBlock tsBlock = root.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
-          sinkHandle.send(Collections.singletonList(tsBlock));
+          sinkHandle.send(tsBlock);
         }
       }
       return NOT_BLOCKED;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 4450ca8c21..406ea5d625 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -41,7 +41,7 @@ public interface ISinkHandle {
    * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
    * will be thrown if any exception happened during the data transmission.
    */
-  void send(List<TsBlock> tsBlocks);
+  void send(TsBlock tsBlock);
 
   /**
    * Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index d6960e7664..aaba4b8d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -100,8 +100,8 @@ public class LocalSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) {
-    Validate.notNull(tsBlocks, "tsBlocks is null");
+  public void send(TsBlock tsBlock) {
+    Validate.notNull(tsBlock, "tsBlocks is null");
     synchronized (this) {
       checkState();
       if (!blocked.isDone()) {
@@ -113,11 +113,9 @@ public class LocalSinkHandle implements ISinkHandle {
       if (queue.hasNoMoreTsBlocks()) {
         return;
       }
-      logger.info("send TsBlocks. Size: {}", tsBlocks.size());
+      logger.info("send TsBlocks");
       synchronized (this) {
-        for (TsBlock tsBlock : tsBlocks) {
-          blocked = queue.add(tsBlock);
-        }
+        blocked = queue.add(tsBlock);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index e8247cc621..872720fb7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.concurrent.SetThreadName;
@@ -46,9 +47,9 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SinkHandle implements ISinkHandle {
 
@@ -71,15 +72,19 @@ public class SinkHandle implements ISinkHandle {
   // Use LinkedHashMap to meet 2 needs,
   //   1. Predictable iteration order so that removing buffered tsblocks can be efficient.
   //   2. Fast lookup.
-  private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
+  private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
+      new LinkedHashMap<>();
+
+  // size for current TsBlock to reserve and free
+  private long currentTsBlockSize;
 
   private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
 
-  private volatile ListenableFuture<Void> blocked = immediateFuture(null);
+  private volatile ListenableFuture<Void> blocked;
   private int nextSequenceId = 0;
   /** The actual buffered memory in bytes, including the amount of memory being reserved. */
-  private long bufferRetainedSizeInBytes = 0;
+  private long bufferRetainedSizeInBytes;
 
   private boolean aborted = false;
 
@@ -109,6 +114,12 @@ public class SinkHandle implements ISinkHandle {
     this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
     this.threadName = createFullIdFrom(localFragmentInstanceId, "SinkHandle");
+    this.blocked =
+        localMemoryManager
+            .getQueryPool()
+            .reserve(localFragmentInstanceId.getQueryId(), DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+    this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
 
   @Override
@@ -122,8 +133,8 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized void send(List<TsBlock> tsBlocks) {
-    Validate.notNull(tsBlocks, "tsBlocks is null");
+  public synchronized void send(TsBlock tsBlock) {
+    Validate.notNull(tsBlock, "tsBlocks is null");
     checkState();
     if (!blocked.isDone()) {
       throw new IllegalStateException("Sink handle is blocked.");
@@ -131,10 +142,7 @@ public class SinkHandle implements ISinkHandle {
     if (noMoreTsBlocks) {
       return;
     }
-    long retainedSizeInBytes = 0L;
-    for (TsBlock tsBlock : tsBlocks) {
-      retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
-    }
+    long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
     int startSequenceId;
     List<Long> tsBlockSizes = new ArrayList<>();
     startSequenceId = nextSequenceId;
@@ -143,12 +151,13 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
     bufferRetainedSizeInBytes += retainedSizeInBytes;
-    for (TsBlock tsBlock : tsBlocks) {
-      sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
-      nextSequenceId += 1;
-    }
+
+    sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
+    nextSequenceId += 1;
+    currentTsBlockSize = retainedSizeInBytes;
+
     for (int i = startSequenceId; i < nextSequenceId; i++) {
-      tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
+      tsBlockSizes.add(sequenceIdToTsBlock.get(i).left.getRetainedSizeInBytes());
     }
 
     // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
@@ -198,6 +207,7 @@ public class SinkHandle implements ISinkHandle {
     }
     logger.info("set noMoreTsBlocks to true");
     noMoreTsBlocks = true;
+
     if (isFinished()) {
       logger.info("revoke onFinish() of sinkHandleListener");
       sinkHandleListener.onFinish(this);
@@ -263,7 +273,7 @@ public class SinkHandle implements ISinkHandle {
 
   ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
     TsBlock tsBlock;
-    tsBlock = sequenceIdToTsBlock.get(sequenceId);
+    tsBlock = sequenceIdToTsBlock.get(sequenceId).left;
     if (tsBlock == null) {
       throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
     }
@@ -276,17 +286,19 @@ public class SinkHandle implements ISinkHandle {
       if (aborted) {
         return;
       }
-      Iterator<Entry<Integer, TsBlock>> iterator = sequenceIdToTsBlock.entrySet().iterator();
+      Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
+          sequenceIdToTsBlock.entrySet().iterator();
       while (iterator.hasNext()) {
-        Entry<Integer, TsBlock> entry = iterator.next();
+        Entry<Integer, Pair<TsBlock, Long>> entry = iterator.next();
         if (entry.getKey() < startSequenceId) {
           continue;
         }
         if (entry.getKey() >= endSequenceId) {
           break;
         }
-        freedBytes += entry.getValue().getRetainedSizeInBytes();
-        bufferRetainedSizeInBytes -= entry.getValue().getRetainedSizeInBytes();
+
+        freedBytes += entry.getValue().right;
+        bufferRetainedSizeInBytes -= entry.getValue().right;
         iterator.remove();
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index e3f4d0fc66..7b24d32c13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -195,7 +195,7 @@ public class MemoryPool {
     Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
     while (iterator.hasNext()) {
       MemoryReservationFuture<Void> future = iterator.next();
-      if (future.isCancelled()) {
+      if (future.isCancelled() || future.isDone()) {
         continue;
       }
       long bytesToReserve = future.getBytes();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 422139a959..1c3cb4e659 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Collections;
-
 public class LocalSinkHandleTest {
   @Test
   public void testSend() {
@@ -68,7 +66,7 @@ public class LocalSinkHandleTest {
     // Send TsBlocks.
     int numOfSentTsblocks = 0;
     while (localSinkHandle.isFull().isDone()) {
-      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
       numOfSentTsblocks += 1;
     }
     Assert.assertEquals(6, numOfSentTsblocks);
@@ -133,7 +131,7 @@ public class LocalSinkHandleTest {
     // Send TsBlocks.
     int numOfSentTsblocks = 0;
     while (localSinkHandle.isFull().isDone()) {
-      localSinkHandle.send(Collections.singletonList(Utils.createMockTsBlock(mockTsBlockSize)));
+      localSinkHandle.send(Utils.createMockTsBlock(mockTsBlockSize));
       numOfSentTsblocks += 1;
     }
     Assert.assertEquals(6, numOfSentTsblocks);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index f8110fcf1d..411370c14a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -41,13 +41,15 @@ import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
 public class SinkHandleTest {
 
   @Test
   public void testOneTimeNotBlockedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -95,18 +97,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -148,7 +152,7 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertTrue(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(mockTsBlockSize, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -173,7 +177,7 @@ public class SinkHandleTest {
   public void testMultiTimesBlockedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -223,18 +227,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -269,19 +275,21 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(1))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(2))
+    Mockito.verify(mockMemoryPool, Mockito.times(3))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -334,7 +342,8 @@ public class SinkHandleTest {
     sinkHandle.acknowledgeTsBlock(numOfMockTsBlock, numOfMockTsBlock * 2);
     Assert.assertTrue(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(mockMemoryPool, Mockito.times(2))
         .free(queryId, numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
@@ -344,7 +353,7 @@ public class SinkHandleTest {
   public void testFailedSend() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -395,18 +404,20 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Assert.assertFalse(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
-    Mockito.verify(mockMemoryPool, Mockito.times(1))
+    Mockito.verify(mockMemoryPool, Mockito.times(2))
         .reserve(queryId, mockTsBlockSize * numOfMockTsBlock);
     try {
       Thread.sleep(100L);
@@ -446,7 +457,7 @@ public class SinkHandleTest {
   public void testAbort() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
-    final int numOfMockTsBlock = 10;
+    final int numOfMockTsBlock = 1;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -456,7 +467,9 @@ public class SinkHandleTest {
     // Construct a mock LocalMemoryManager that returns blocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool spyMemoryPool =
-        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 10 * mockTsBlockSize));
+        Mockito.spy(
+            new MemoryPool(
+                "test", numOfMockTsBlock * mockTsBlockSize, numOfMockTsBlock * mockTsBlockSize));
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
 
     // Construct a mock SinkHandleListener.
@@ -496,20 +509,21 @@ public class SinkHandleTest {
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
-    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
     Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
 
     // Send tsblocks.
-    sinkHandle.send(mockTsBlocks);
-    sinkHandle.send(mockTsBlocks);
+    sinkHandle.send(mockTsBlocks.get(0));
     Future<?> blocked = sinkHandle.isFull();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isAborted());
     Assert.assertEquals(
-        2 * mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
-    Assert.assertEquals(2 * numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+        mockTsBlockSize * numOfMockTsBlock + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
 
     sinkHandle.abort();
     Assert.assertTrue(blocked.isDone());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index d68ad6726a..2930abe48b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -59,8 +59,8 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(List<TsBlock> tsBlocks) {
-    this.tsBlocks.addAll(tsBlocks);
+  public void send(TsBlock tsBlock) {
+    this.tsBlocks.add(tsBlock);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
index 211f635636..165f0e65a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/Utils.java
@@ -54,6 +54,7 @@ public class Utils {
 
   public static MemoryPool createMockBlockedMemoryPool(
       String queryId, int numOfMockTsBlock, long mockTsBlockSize) {
+    long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
     MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
     AtomicReference<SettableFuture<Void>> settableFuture = new AtomicReference<>();
     settableFuture.set(SettableFuture.create());
@@ -62,8 +63,12 @@ public class Utils {
     Mockito.when(mockMemoryPool.reserve(Mockito.eq(queryId), Mockito.anyLong()))
         .thenAnswer(
             invocation -> {
-              reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
-              settableFuture.set(SettableFuture.create());
+              long bytesToReserve = invocation.getArgument(1);
+              if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
+                reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(1));
+              } else {
+                settableFuture.set(SettableFuture.create());
+              }
               return settableFuture.get();
             });
     Mockito.doAnswer(
@@ -72,12 +77,12 @@ public class Utils {
                   reservedBytes.updateAndGet(v -> v - (long) invocation.getArgument(1));
                   if (reservedBytes.get() <= 0) {
                     settableFuture.get().set(null);
+                    reservedBytes.updateAndGet(v -> v + mockTsBlockSize);
                   }
                   return null;
                 })
         .when(mockMemoryPool)
         .free(Mockito.eq(queryId), Mockito.anyLong());
-    long capacityInBytes = numOfMockTsBlock * mockTsBlockSize;
     Mockito.when(mockMemoryPool.tryReserve(Mockito.eq(queryId), Mockito.anyLong()))
         .thenAnswer(
             invocation -> {