You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/21 09:28:42 UTC
[iotdb] branch master updated: [IOTDB-2971] Fix sink handle memory leak (#5626)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 75620dd4d6 [IOTDB-2971] Fix sink handle memory leak (#5626)
75620dd4d6 is described below
commit 75620dd4d681fd6c6133d8da9e03e669442496d3
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Thu Apr 21 17:28:37 2022 +0800
[IOTDB-2971] Fix sink handle memory leak (#5626)
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 12 +--
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 13 ++-
.../apache/iotdb/db/mpp/buffer/ISourceHandle.java | 10 +-
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 16 ++--
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 13 +--
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 101 +++++++++++++++++++--
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 93 +++++++++++++++++--
7 files changed, 216 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 6a8f34ccf5..e09acf28cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -258,7 +258,7 @@ public class DataBlockManager implements IDataBlockManager {
@Override
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
- TEndPoint endpoint,
+ TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
FragmentInstanceContext instanceContext) {
@@ -274,13 +274,13 @@ public class DataBlockManager implements IDataBlockManager {
SinkHandle sinkHandle =
new SinkHandle(
- endpoint.toString(),
+ remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(endpoint),
+ clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
new SinkHandleListenerImpl(instanceContext));
sinkHandles.put(localFragmentInstanceId, sinkHandle);
@@ -291,7 +291,7 @@ public class DataBlockManager implements IDataBlockManager {
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
- TEndPoint endpoint,
+ TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
@@ -311,13 +311,13 @@ public class DataBlockManager implements IDataBlockManager {
SourceHandle sourceHandle =
new SourceHandle(
- endpoint.getIp(),
+ remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(endpoint),
+ clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl());
sourceHandles
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 6300c5beef..b27e86186b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -38,8 +38,8 @@ public interface ISinkHandle extends AutoCloseable {
/**
* Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set,
- * the send tsblock call is ignored. This can happen with limit queries. A {@link
- * RuntimeException} will be thrown if any exception happened * during the data transmission.
+ * the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
+ * will be thrown if any exception happened during the data transmission.
*/
void send(List<TsBlock> tsBlocks) throws IOException;
@@ -57,13 +57,13 @@ public interface ISinkHandle extends AutoCloseable {
void setNoMoreTsBlocks();
/** If the handle is closed. */
- public boolean isClosed();
+ boolean isClosed();
/**
* If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
* instances.
*/
- public boolean isFinished();
+ boolean isFinished();
/**
* Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
@@ -73,6 +73,9 @@ public interface ISinkHandle extends AutoCloseable {
@Override
void close() throws IOException;
- /** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
+ /**
+ * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+ * the future returned by {@link #isFull()}.
+ */
void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index dfb9257b97..9e31918627 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -39,16 +39,16 @@ public interface ISourceHandle extends Closeable {
/** If there are more tsblocks. */
boolean isFinished();
- /**
- * Get a future that will be completed when the input buffer is not empty. The future will not
- * complete even when the handle is finished or closed.
- */
+ /** Get a future that will be completed when the input buffer is not empty. */
ListenableFuture<Void> isBlocked();
/** If this handle is closed. */
boolean isClosed();
- /** Close the handle. Discarding all tsblocks which may still be in memory buffer. */
+ /**
+ * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+ * future returned by {@link #isBlocked()}.
+ */
@Override
void close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index a72dcb9cb6..f66d8b49f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -53,7 +54,7 @@ public class SinkHandle implements ISinkHandle {
public static final int MAX_ATTEMPT_TIMES = 3;
- private final String remoteHostname;
+ private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final String remotePlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
@@ -76,7 +77,7 @@ public class SinkHandle implements ISinkHandle {
private Throwable throwable;
public SinkHandle(
- String remoteHostname,
+ TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
TFragmentInstanceId localFragmentInstanceId,
@@ -85,7 +86,7 @@ public class SinkHandle implements ISinkHandle {
DataBlockService.Iface client,
TsBlockSerde serde,
SinkHandleListener sinkHandleListener) {
- this.remoteHostname = Validate.notNull(remoteHostname);
+ this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
@@ -220,6 +221,9 @@ public class SinkHandle implements ISinkHandle {
synchronized (this) {
sequenceIdToTsBlock.clear();
closed = true;
+ if (blocked != null && !blocked.isDone()) {
+ blocked.cancel(true);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -292,8 +296,8 @@ public class SinkHandle implements ISinkHandle {
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes);
}
- String getRemoteHostname() {
- return remoteHostname;
+ TEndPoint getRemoteEndpoint() {
+ return remoteEndpoint;
}
TFragmentInstanceId getRemoteFragmentInstanceId() {
@@ -311,7 +315,7 @@ public class SinkHandle implements ISinkHandle {
@Override
public String toString() {
return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]")
- .add("remoteHostname='" + remoteHostname + "'")
+ .add("remoteEndpoint='" + remoteEndpoint + "'")
.add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
.add("remotePlanNodeId='" + remotePlanNodeId + "'")
.add("localFragmentInstanceId=" + localFragmentInstanceId)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 37a5bce1b0..1d6b92d25e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -53,7 +54,7 @@ public class SourceHandle implements ISourceHandle {
public static final int MAX_ATTEMPT_TIMES = 3;
- private final String remoteHostname;
+ private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
private final String localPlanNodeId;
@@ -75,7 +76,7 @@ public class SourceHandle implements ISourceHandle {
private Throwable throwable;
public SourceHandle(
- String remoteHostname,
+ TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
@@ -84,7 +85,7 @@ public class SourceHandle implements ISourceHandle {
DataBlockService.Iface client,
TsBlockSerde serde,
SourceHandleListener sourceHandleListener) {
- this.remoteHostname = Validate.notNull(remoteHostname);
+ this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
@@ -249,8 +250,8 @@ public class SourceHandle implements ISourceHandle {
return currSequenceId - 1 == lastSequenceId;
}
- String getRemoteHostname() {
- return remoteHostname;
+ TEndPoint getRemoteEndpoint() {
+ return remoteEndpoint;
}
TFragmentInstanceId getRemoteFragmentInstanceId() {
@@ -278,7 +279,7 @@ public class SourceHandle implements ISourceHandle {
@Override
public String toString() {
return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]")
- .add("remoteHostname='" + remoteHostname + "'")
+ .add("remoteEndpoint='" + remoteEndpoint + "'")
.add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
.add("localFragmentInstanceId=" + localFragmentInstanceId)
.add("localPlanNodeId='" + localPlanNodeId + "'")
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index b47b3c57c8..e8a7767100 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.memory.MemoryPool;
@@ -37,6 +39,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class SinkHandleTest {
@@ -45,7 +48,9 @@ public class SinkHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -75,7 +80,7 @@ public class SinkHandleTest {
// Construct SinkHandle.
SinkHandle sinkHandle =
new SinkHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
@@ -174,7 +179,9 @@ public class SinkHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -206,7 +213,7 @@ public class SinkHandleTest {
// Construct SinkHandle.
SinkHandle sinkHandle =
new SinkHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
@@ -356,7 +363,9 @@ public class SinkHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
@@ -387,7 +396,7 @@ public class SinkHandleTest {
// Construct SinkHandle.
SinkHandle sinkHandle =
new SinkHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
@@ -457,4 +466,84 @@ public class SinkHandleTest {
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onFinish(sinkHandle);
}
+
+ @Test
+ public void testAbort() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final int numOfMockTsBlock = 10;
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+ final String remotePlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+
+ // Construct a mock LocalMemoryManager that returns blocked futures.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool mockMemoryPool =
+ Utils.createMockBlockedMemoryPool(queryId, numOfMockTsBlock, mockTsBlockSize);
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+
+ // Construct a mock SinkHandleListener.
+ SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
+ // Construct several mock TsBlock(s).
+ List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+ // Construct a mock client.
+ Client mockClient = Mockito.mock(Client.class);
+ try {
+ Mockito.doNothing()
+ .when(mockClient)
+ .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
+ Mockito.doNothing()
+ .when(mockClient)
+ .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
+ } catch (TException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ // Construct SinkHandle.
+ SinkHandle sinkHandle =
+ new SinkHandle(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localFragmentInstanceId,
+ mockLocalMemoryManager,
+ Executors.newSingleThreadExecutor(),
+ mockClient,
+ Utils.createMockTsBlockSerde(mockTsBlockSize),
+ mockSinkHandleListener);
+ Assert.assertTrue(sinkHandle.isFull().isDone());
+ Assert.assertFalse(sinkHandle.isFinished());
+ Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
+
+ // Send tsblocks.
+ try {
+ sinkHandle.send(mockTsBlocks);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ Future<?> blocked = sinkHandle.isFull();
+ Assert.assertFalse(blocked.isDone());
+ Assert.assertFalse(blocked.isCancelled());
+ Assert.assertFalse(sinkHandle.isFinished());
+ Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertEquals(
+ mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+
+ sinkHandle.abort();
+ Assert.assertTrue(blocked.isDone());
+ Assert.assertTrue(blocked.isCancelled());
+ Assert.assertFalse(sinkHandle.isFinished());
+ Assert.assertTrue(sinkHandle.isClosed());
+ Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 461fac1e13..dc84cff501 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.buffer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.memory.MemoryPool;
@@ -39,6 +41,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -49,7 +52,9 @@ public class SourceHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -84,7 +89,7 @@ public class SourceHandleTest {
SourceHandle sourceHandle =
new SourceHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
@@ -168,7 +173,9 @@ public class SourceHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -204,7 +211,7 @@ public class SourceHandleTest {
SourceHandle sourceHandle =
new SourceHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
@@ -313,7 +320,9 @@ public class SourceHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -348,7 +357,7 @@ public class SourceHandleTest {
SourceHandle sourceHandle =
new SourceHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
@@ -504,7 +513,9 @@ public class SourceHandleTest {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
- final String remoteHostname = "remote";
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
@@ -530,7 +541,7 @@ public class SourceHandleTest {
SourceHandle sourceHandle =
new SourceHandle(
- remoteHostname,
+ remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
@@ -585,4 +596,70 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
}
+
+ @Test
+ public void testForceClose() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L * 1024L;
+ final TEndPoint remoteEndpoint =
+ new TEndPoint(
+ "remote", IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort());
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
+ final String localPlanNodeId = "exchange_0";
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
+
+ // Construct a mock LocalMemoryManager that do not block any reservation.
+ LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
+ MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ // Construct a mock client.
+ Client mockClient = Mockito.mock(Client.class);
+ try {
+ Mockito.doAnswer(
+ invocation -> {
+ TGetDataBlockRequest req = invocation.getArgument(0);
+ List<ByteBuffer> byteBuffers =
+ new ArrayList<>(req.getEndSequenceId() - req.getStartSequenceId());
+ for (int i = 0; i < req.getEndSequenceId() - req.getStartSequenceId(); i++) {
+ byteBuffers.add(ByteBuffer.allocate(0));
+ }
+ return new TGetDataBlockResponse(byteBuffers);
+ })
+ .when(mockClient)
+ .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
+ } catch (TException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ // Construct a mock SourceHandleListener.
+ SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
+ // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
+ TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+
+ SourceHandle sourceHandle =
+ new SourceHandle(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ mockLocalMemoryManager,
+ Executors.newSingleThreadExecutor(),
+ mockClient,
+ mockTsBlockSerde,
+ mockSourceHandleListener);
+ Future<?> blocked = sourceHandle.isBlocked();
+ Assert.assertFalse(blocked.isDone());
+ Assert.assertFalse(blocked.isCancelled());
+ Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isFinished());
+ Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+
+ sourceHandle.close();
+ Assert.assertTrue(blocked.isDone());
+ Assert.assertTrue(blocked.isCancelled());
+ Assert.assertTrue(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isFinished());
+ Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onClosed(sourceHandle);
+ }
}