You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/21 09:28:42 UTC

[iotdb] branch master updated: [IOTDB-2971] Fix sink handle memory leak (#5626)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 75620dd4d6 [IOTDB-2971] Fix sink handle memory leak (#5626)
75620dd4d6 is described below

commit 75620dd4d681fd6c6133d8da9e03e669442496d3
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Thu Apr 21 17:28:37 2022 +0800

    [IOTDB-2971] Fix sink handle memory leak (#5626)
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  12 +--
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |  13 ++-
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  |  10 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  16 ++--
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  13 +--
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 101 +++++++++++++++++++--
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  93 +++++++++++++++++--
 7 files changed, 216 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 6a8f34ccf5..e09acf28cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -258,7 +258,7 @@ public class DataBlockManager implements IDataBlockManager {
   @Override
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      TEndPoint endpoint,
+      TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId,
       FragmentInstanceContext instanceContext) {
@@ -274,13 +274,13 @@ public class DataBlockManager implements IDataBlockManager {
 
     SinkHandle sinkHandle =
         new SinkHandle(
-            endpoint.toString(),
+            remoteEndpoint,
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(endpoint),
+            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
             new SinkHandleListenerImpl(instanceContext));
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
@@ -291,7 +291,7 @@ public class DataBlockManager implements IDataBlockManager {
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      TEndPoint endpoint,
+      TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
@@ -311,13 +311,13 @@ public class DataBlockManager implements IDataBlockManager {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            endpoint.getIp(),
+            remoteEndpoint,
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(endpoint),
+            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl());
     sourceHandles
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 6300c5beef..b27e86186b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -38,8 +38,8 @@ public interface ISinkHandle extends AutoCloseable {
 
   /**
    * Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set,
-   * the send tsblock call is ignored. This can happen with limit queries. A {@link
-   * RuntimeException} will be thrown if any exception happened * during the data transmission.
+   * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
+   * will be thrown if any exception happened during the data transmission.
    */
   void send(List<TsBlock> tsBlocks) throws IOException;
 
@@ -57,13 +57,13 @@ public interface ISinkHandle extends AutoCloseable {
   void setNoMoreTsBlocks();
 
   /** If the handle is closed. */
-  public boolean isClosed();
+  boolean isClosed();
 
   /**
    * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
    * instances.
    */
-  public boolean isFinished();
+  boolean isFinished();
 
   /**
    * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
@@ -73,6 +73,9 @@ public interface ISinkHandle extends AutoCloseable {
   @Override
   void close() throws IOException;
 
-  /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
+  /**
+   * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+   * the future returned by {@link #isFull()}.
+   */
   void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index dfb9257b97..9e31918627 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -39,16 +39,16 @@ public interface ISourceHandle extends Closeable {
   /** If there are more tsblocks. */
   boolean isFinished();
 
-  /**
-   * Get a future that will be completed when the input buffer is not empty. The future will not
-   * complete even when the handle is finished or closed.
-   */
+  /** Get a future that will be completed when the input buffer is not empty. */
   ListenableFuture<Void> isBlocked();
 
   /** If this handle is closed. */
   boolean isClosed();
 
-  /** Close the handle. Discarding all tsblocks which may still be in memory buffer. */
+  /**
+   * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+   * future returned by {@link #isBlocked()}.
+   */
   @Override
   void close();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index a72dcb9cb6..f66d8b49f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -53,7 +54,7 @@ public class SinkHandle implements ISinkHandle {
 
   public static final int MAX_ATTEMPT_TIMES = 3;
 
-  private final String remoteHostname;
+  private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final String remotePlanNodeId;
   private final TFragmentInstanceId localFragmentInstanceId;
@@ -76,7 +77,7 @@ public class SinkHandle implements ISinkHandle {
   private Throwable throwable;
 
   public SinkHandle(
-      String remoteHostname,
+      TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId,
       TFragmentInstanceId localFragmentInstanceId,
@@ -85,7 +86,7 @@ public class SinkHandle implements ISinkHandle {
       DataBlockService.Iface client,
       TsBlockSerde serde,
       SinkHandleListener sinkHandleListener) {
-    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
@@ -220,6 +221,9 @@ public class SinkHandle implements ISinkHandle {
     synchronized (this) {
       sequenceIdToTsBlock.clear();
       closed = true;
+      if (blocked != null && !blocked.isDone()) {
+        blocked.cancel(true);
+      }
       if (bufferRetainedSizeInBytes > 0) {
         localMemoryManager
             .getQueryPool()
@@ -292,8 +296,8 @@ public class SinkHandle implements ISinkHandle {
     localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes);
   }
 
-  String getRemoteHostname() {
-    return remoteHostname;
+  TEndPoint getRemoteEndpoint() {
+    return remoteEndpoint;
   }
 
   TFragmentInstanceId getRemoteFragmentInstanceId() {
@@ -311,7 +315,7 @@ public class SinkHandle implements ISinkHandle {
   @Override
   public String toString() {
     return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]")
-        .add("remoteHostname='" + remoteHostname + "'")
+        .add("remoteEndpoint='" + remoteEndpoint + "'")
         .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
         .add("remotePlanNodeId='" + remotePlanNodeId + "'")
         .add("localFragmentInstanceId=" + localFragmentInstanceId)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 37a5bce1b0..1d6b92d25e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -53,7 +54,7 @@ public class SourceHandle implements ISourceHandle {
 
   public static final int MAX_ATTEMPT_TIMES = 3;
 
-  private final String remoteHostname;
+  private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
   private final String localPlanNodeId;
@@ -75,7 +76,7 @@ public class SourceHandle implements ISourceHandle {
   private Throwable throwable;
 
   public SourceHandle(
-      String remoteHostname,
+      TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
@@ -84,7 +85,7 @@ public class SourceHandle implements ISourceHandle {
       DataBlockService.Iface client,
       TsBlockSerde serde,
       SourceHandleListener sourceHandleListener) {
-    this.remoteHostname = Validate.notNull(remoteHostname);
+    this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
@@ -249,8 +250,8 @@ public class SourceHandle implements ISourceHandle {
     return currSequenceId - 1 == lastSequenceId;
   }
 
-  String getRemoteHostname() {
-    return remoteHostname;
+  TEndPoint getRemoteEndpoint() {
+    return remoteEndpoint;
   }
 
   TFragmentInstanceId getRemoteFragmentInstanceId() {
@@ -278,7 +279,7 @@ public class SourceHandle implements ISourceHandle {
   @Override
   public String toString() {
     return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]")
-        .add("remoteHostname='" + remoteHostname + "'")
+        .add("remoteEndpoint='" + remoteEndpoint + "'")
         .add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
         .add("localFragmentInstanceId=" + localFragmentInstanceId)
         .add("localPlanNodeId='" + localPlanNodeId + "'")
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index b47b3c57c8..e8a7767100 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
@@ -37,6 +39,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class SinkHandleTest {
 
@@ -45,7 +48,9 @@ public class SinkHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -75,7 +80,7 @@ public class SinkHandleTest {
     // Construct SinkHandle.
     SinkHandle sinkHandle =
         new SinkHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
@@ -174,7 +179,9 @@ public class SinkHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -206,7 +213,7 @@ public class SinkHandleTest {
     // Construct SinkHandle.
     SinkHandle sinkHandle =
         new SinkHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
@@ -356,7 +363,9 @@ public class SinkHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -387,7 +396,7 @@ public class SinkHandleTest {
     // Construct SinkHandle.
     SinkHandle sinkHandle =
         new SinkHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
@@ -457,4 +466,84 @@ public class SinkHandleTest {
     Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
     Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onFinish(sinkHandle);
   }
+
+  @Test
+  public void testAbort() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final int numOfMockTsBlock = 10;
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+    final String remotePlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+    // Construct a mock LocalMemoryManager that returns blocked futures.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool mockMemoryPool =
+        Utils.createMockBlockedMemoryPool(queryId, numOfMockTsBlock, mockTsBlockSize);
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+
+    // Construct a mock SinkHandleListener.
+    SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+    // Construct several mock TsBlock(s).
+    List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    // Construct a mock client.
+    Client mockClient = Mockito.mock(Client.class);
+    try {
+      Mockito.doNothing()
+          .when(mockClient)
+          .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
+      Mockito.doNothing()
+          .when(mockClient)
+          .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
+    } catch (TException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    // Construct SinkHandle.
+    SinkHandle sinkHandle =
+        new SinkHandle(
+            remoteEndpoint,
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            localFragmentInstanceId,
+            mockLocalMemoryManager,
+            Executors.newSingleThreadExecutor(),
+            mockClient,
+            Utils.createMockTsBlockSerde(mockTsBlockSize),
+            mockSinkHandleListener);
+    Assert.assertTrue(sinkHandle.isFull().isDone());
+    Assert.assertFalse(sinkHandle.isFinished());
+    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
+
+    // Send tsblocks.
+    try {
+      sinkHandle.send(mockTsBlocks);
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    Future<?> blocked = sinkHandle.isFull();
+    Assert.assertFalse(blocked.isDone());
+    Assert.assertFalse(blocked.isCancelled());
+    Assert.assertFalse(sinkHandle.isFinished());
+    Assert.assertFalse(sinkHandle.isClosed());
+    Assert.assertEquals(
+        mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+
+    sinkHandle.abort();
+    Assert.assertTrue(blocked.isDone());
+    Assert.assertTrue(blocked.isCancelled());
+    Assert.assertFalse(sinkHandle.isFinished());
+    Assert.assertTrue(sinkHandle.isClosed());
+    Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 461fac1e13..dc84cff501 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
@@ -39,6 +41,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -49,7 +52,9 @@ public class SourceHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -84,7 +89,7 @@ public class SourceHandleTest {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
@@ -168,7 +173,9 @@ public class SourceHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -204,7 +211,7 @@ public class SourceHandleTest {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
@@ -313,7 +320,9 @@ public class SourceHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -348,7 +357,7 @@ public class SourceHandleTest {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
@@ -504,7 +513,9 @@ public class SourceHandleTest {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
-    final String remoteHostname = "remote";
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
     final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -530,7 +541,7 @@ public class SourceHandleTest {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            remoteEndpoint,
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
@@ -585,4 +596,70 @@ public class SourceHandleTest {
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
   }
+
+  @Test
+  public void testForceClose() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L * 1024L;
+    final TEndPoint remoteEndpoint =
+        new TEndPoint(
+            "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+    final String localPlanNodeId = "exchange_0";
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+    // Construct a mock LocalMemoryManager that do not block any reservation.
+    LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+    MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+    Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    // Construct a mock client.
+    Client mockClient = Mockito.mock(Client.class);
+    try {
+      Mockito.doAnswer(
+              invocation -> {
+                TGetDataBlockRequest req = invocation.getArgument(0);
+                List<ByteBuffer> byteBuffers =
+                    new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
+                for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
+                  byteBuffers.add(ByteBuffer.allocate(0));
+                }
+                return new TGetDataBlockResponse(byteBuffers);
+              })
+          .when(mockClient)
+          .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
+    } catch (TException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    // Construct a mock SourceHandleListener.
+    SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+    // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
+    TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+
+    SourceHandle sourceHandle =
+        new SourceHandle(
+            remoteEndpoint,
+            remoteFragmentInstanceId,
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockLocalMemoryManager,
+            Executors.newSingleThreadExecutor(),
+            mockClient,
+            mockTsBlockSerde,
+            mockSourceHandleListener);
+    Future<?> blocked = sourceHandle.isBlocked();
+    Assert.assertFalse(blocked.isDone());
+    Assert.assertFalse(blocked.isCancelled());
+    Assert.assertFalse(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isFinished());
+    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+
+    sourceHandle.close();
+    Assert.assertTrue(blocked.isDone());
+    Assert.assertTrue(blocked.isCancelled());
+    Assert.assertTrue(sourceHandle.isClosed());
+    Assert.assertFalse(sourceHandle.isFinished());
+    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onClosed(sourceHandle);
+  }
 }