You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/28 10:35:22 UTC
[iotdb] branch master updated: [IOTDB-3021] Fix sink/source handle memory leak (#5692)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c1abdd894 [IOTDB-3021] Fix sink/source handle memory leak (#5692)
4c1abdd894 is described below
commit 4c1abdd894911d9790808105c1e0a5bd194ff88f
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Thu Apr 28 18:35:16 2022 +0800
[IOTDB-3021] Fix sink/source handle memory leak (#5692)
---
.../iotdb/db/mpp/buffer/DataBlockManager.java | 14 +--
.../apache/iotdb/db/mpp/buffer/ISinkHandle.java | 19 ++--
.../apache/iotdb/db/mpp/buffer/ISourceHandle.java | 13 +--
.../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 104 ++++++++-----------
.../apache/iotdb/db/mpp/buffer/SourceHandle.java | 112 ++++++++++-----------
.../org/apache/iotdb/db/mpp/execution/Driver.java | 2 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 4 +-
.../org/apache/iotdb/db/mpp/memory/MemoryPool.java | 25 ++++-
.../db/mpp/operator/source/ExchangeOperator.java | 2 +-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 69 ++++++-------
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 76 ++++++--------
.../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 20 ++--
.../apache/iotdb/db/mpp/memory/MemoryPoolTest.java | 25 +++++
13 files changed, 240 insertions(+), 245 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index e4d38df2f1..20b7be00bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -54,7 +54,7 @@ public class DataBlockManager implements IDataBlockManager {
public interface SourceHandleListener {
void onFinished(ISourceHandle sourceHandle);
- void onClosed(ISourceHandle sourceHandle);
+ void onAborted(ISourceHandle sourceHandle);
void onFailure(ISourceHandle sourceHandle, Throwable t);
}
@@ -62,7 +62,7 @@ public class DataBlockManager implements IDataBlockManager {
public interface SinkHandleListener {
void onFinish(ISinkHandle sinkHandle);
- void onClosed(ISinkHandle sinkHandle);
+ void onEndOfBlocks(ISinkHandle sinkHandle);
void onAborted(ISinkHandle sinkHandle);
@@ -130,7 +130,7 @@ public class DataBlockManager implements IDataBlockManager {
|| sourceHandles
.get(e.getTargetFragmentInstanceId())
.get(e.getTargetPlanNodeId())
- .isClosed()) {
+ .isAborted()) {
throw new TException(
"Target fragment instance not found. Fragment instance ID: "
+ e.getTargetFragmentInstanceId()
@@ -156,7 +156,7 @@ public class DataBlockManager implements IDataBlockManager {
|| sourceHandles
.get(e.getTargetFragmentInstanceId())
.get(e.getTargetPlanNodeId())
- .isClosed()) {
+ .isAborted()) {
throw new TException(
"Target fragment instance not found. Fragment instance ID: "
+ e.getTargetFragmentInstanceId()
@@ -200,7 +200,7 @@ public class DataBlockManager implements IDataBlockManager {
}
@Override
- public void onClosed(ISourceHandle sourceHandle) {
+ public void onAborted(ISourceHandle sourceHandle) {
onFinished(sourceHandle);
}
@@ -236,7 +236,7 @@ public class DataBlockManager implements IDataBlockManager {
}
@Override
- public void onClosed(ISinkHandle sinkHandle) {
+ public void onEndOfBlocks(ISinkHandle sinkHandle) {
context.transitionToFlushing();
}
@@ -379,7 +379,7 @@ public class DataBlockManager implements IDataBlockManager {
Map<String, SourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId);
for (Entry<String, SourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
logger.info("Close source handle {}", sourceHandles);
- entry.getValue().close();
+ entry.getValue().abort();
}
sourceHandles.remove(fragmentInstanceId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index b28b812175..da42a93dc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -51,27 +51,20 @@ public interface ISinkHandle {
void send(int partition, List<TsBlock> tsBlocks);
/**
- * Notify the handle that no more tsblocks will be sent. Any future calls to send a tsblock should
- * be ignored.
+ * Notify the handle that there are no more tsblocks. Any future calls to send a tsblock should be
+ * ignored.
*/
void setNoMoreTsBlocks();
- /** If the handle is closed. */
- boolean isClosed();
+ /** If the handle is aborted. */
+ boolean isAborted();
/**
- * If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
- * instances.
+ * If there are no more tsblocks to be sent and all the tsblocks have been fetched by downstream
+ * fragment instances.
*/
boolean isFinished();
- /**
- * Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
- * downstream instances. A {@link RuntimeException} will be thrown if any exception happened
- * during the data transmission.
- */
- void close();
-
/**
* Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
* the future returned by {@link #isFull()}.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index e9be12d838..1df8f3d18b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.Closeable;
-
-public interface ISourceHandle extends Closeable {
+public interface ISourceHandle {
/** Get the local fragment instance ID that this source handle belongs to. */
TFragmentInstanceId getLocalFragmentInstanceId();
@@ -48,13 +46,12 @@ public interface ISourceHandle extends Closeable {
/** Get a future that will be completed when the input buffer is not empty. */
ListenableFuture<Void> isBlocked();
- /** If this handle is closed. */
- boolean isClosed();
+ /** If this handle is aborted. */
+ boolean isAborted();
/**
- * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
+ * Abort the handle. Discard all tsblocks which may still be in the memory buffer and complete the
* future returned by {@link #isBlocked()}.
*/
- @Override
- void close();
+ void abort();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index eea8850869..63c6d89de6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -74,8 +74,10 @@ public class SinkHandle implements ISinkHandle {
private volatile ListenableFuture<Void> blocked = immediateFuture(null);
private int nextSequenceId = 0;
+ /** The actual buffered memory in bytes, including the amount of memory being reserved. */
private long bufferRetainedSizeInBytes = 0;
- private boolean closed = false;
+
+ private boolean aborted = false;
private boolean noMoreTsBlocks = false;
public SinkHandle(
@@ -100,9 +102,9 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public ListenableFuture<Void> isFull() {
- if (closed) {
- throw new IllegalStateException("Sink handle is closed.");
+ public synchronized ListenableFuture<Void> isFull() {
+ if (aborted) {
+ throw new IllegalStateException("Sink handle is aborted.");
}
return nonCancellationPropagating(blocked);
}
@@ -112,10 +114,10 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void send(List<TsBlock> tsBlocks) {
+ public synchronized void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
- if (closed) {
- throw new IllegalStateException("Sink handle is closed.");
+ if (aborted) {
+ throw new IllegalStateException("Sink handle is aborted.");
}
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
@@ -123,27 +125,24 @@ public class SinkHandle implements ISinkHandle {
if (noMoreTsBlocks) {
return;
}
-
long retainedSizeInBytes = 0L;
for (TsBlock tsBlock : tsBlocks) {
retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
}
int startSequenceId;
List<Long> tsBlockSizes = new ArrayList<>();
- synchronized (this) {
- startSequenceId = nextSequenceId;
- blocked =
- localMemoryManager
- .getQueryPool()
- .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
- bufferRetainedSizeInBytes += retainedSizeInBytes;
- for (TsBlock tsBlock : tsBlocks) {
- sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
- nextSequenceId += 1;
- }
- for (int i = startSequenceId; i < nextSequenceId; i++) {
- tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
- }
+ startSequenceId = nextSequenceId;
+ blocked =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
+ bufferRetainedSizeInBytes += retainedSizeInBytes;
+ for (TsBlock tsBlock : tsBlocks) {
+ sequenceIdToTsBlock.put(nextSequenceId, tsBlock);
+ nextSequenceId += 1;
+ }
+ for (int i = startSequenceId; i < nextSequenceId; i++) {
+ tsBlockSizes.add(sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
}
// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
@@ -151,7 +150,7 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void send(int partition, List<TsBlock> tsBlocks) {
+ public synchronized void send(int partition, List<TsBlock> tsBlocks) {
throw new UnsupportedOperationException();
}
@@ -210,9 +209,9 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public void close() {
- logger.info("Sink handle {} is being closed.", this);
- if (closed) {
+ public synchronized void setNoMoreTsBlocks() {
+ logger.info("Setting no-more-tsblocks to {}", this);
+ if (aborted) {
return;
}
try {
@@ -220,51 +219,33 @@ public class SinkHandle implements ISinkHandle {
} catch (Exception e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
- synchronized (this) {
- closed = true;
- // synchronized is reentrant lock, wo we can invoke setNoMoreTsBlocks() here.
- setNoMoreTsBlocks();
+ noMoreTsBlocks = true;
+ if (isFinished()) {
+ sinkHandleListener.onFinish(this);
}
- sinkHandleListener.onClosed(this);
- logger.info("Sink handle {} is closed.", this);
+ sinkHandleListener.onEndOfBlocks(this);
+ logger.info("No more tsblocks has been set to {}", this);
}
@Override
- public void abort() {
+ public synchronized void abort() {
logger.info("Sink handle {} is being aborted.", this);
- synchronized (this) {
- sequenceIdToTsBlock.clear();
- closed = true;
- if (blocked != null && !blocked.isDone()) {
- blocked.cancel(true);
- }
- if (bufferRetainedSizeInBytes > 0) {
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
- bufferRetainedSizeInBytes = 0;
- }
+ sequenceIdToTsBlock.clear();
+ aborted = true;
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+ if (bufferRetainedSizeInBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
}
sinkHandleListener.onAborted(this);
logger.info("Sink handle {} is aborted", this);
}
@Override
- public synchronized void setNoMoreTsBlocks() {
- noMoreTsBlocks = true;
- // In current implementation, the onFinish() is only invoked when receiving the
- // acknowledge event from SourceHandle. If the acknowledge event happens before
- // the close(), the onFinish() won't be invoked and the instance's status will
- // always be FLUSHING. We cannot ensure the sequence of `acknowledge event` and
- // `close` so we need to do following check every time `noMoreTsBlocks` is updated.
- if (isFinished()) {
- sinkHandleListener.onFinish(this);
- }
- }
-
- @Override
- public boolean isClosed() {
- return closed;
+ public boolean isAborted() {
+ return aborted;
}
@Override
@@ -297,6 +278,9 @@ public class SinkHandle implements ISinkHandle {
void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
long freedBytes = 0L;
synchronized (this) {
+ if (aborted) {
+ return;
+ }
Iterator<Entry<Integer, TsBlock>> iterator = sequenceIdToTsBlock.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, TsBlock> entry = iterator.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index 1ebb694b76..6037ab8779 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -69,12 +69,17 @@ public class SourceHandle implements ISourceHandle {
private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
dataBlockServiceClientManager;
- private volatile SettableFuture<Void> blocked = SettableFuture.create();
+ private SettableFuture<Void> blocked = SettableFuture.create();
+
+ private ListenableFuture<Void> blockedOnMemory;
+
+ /** The actual buffered memory in bytes, including the amount of memory being reserved. */
private long bufferRetainedSizeInBytes = 0L;
+
private int currSequenceId = 0;
private int nextSequenceId = 0;
private int lastSequenceId = Integer.MAX_VALUE;
- private boolean closed = false;
+ private boolean aborted = false;
public SourceHandle(
TEndPoint remoteEndpoint,
@@ -99,25 +104,24 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public TsBlock receive() {
- if (closed) {
- throw new IllegalStateException("Source handle is closed.");
+ public synchronized TsBlock receive() {
+ if (aborted) {
+ throw new IllegalStateException("Source handle is aborted.");
}
if (!blocked.isDone()) {
throw new IllegalStateException("Source handle is blocked.");
}
- TsBlock tsBlock;
- synchronized (this) {
- tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
- currSequenceId += 1;
- bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
- if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
- blocked = SettableFuture.create();
- }
+ TsBlock tsBlock;
+ tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
+ currSequenceId += 1;
+ bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+
+ if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
+ blocked = SettableFuture.create();
}
if (isFinished()) {
sourceHandleListener.onFinished(this);
@@ -127,10 +131,17 @@ public class SourceHandle implements ISourceHandle {
}
private synchronized void trySubmitGetDataBlocksTask() {
+ if (aborted) {
+ return;
+ }
+ if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
+ return;
+ }
+
final int startSequenceId = nextSequenceId;
int endSequenceId = nextSequenceId;
long reservedBytes = 0L;
- ListenableFuture<?> future = null;
+ ListenableFuture<Void> future = null;
while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
if (bytesToReserve == null) {
@@ -140,11 +151,10 @@ public class SourceHandle implements ISourceHandle {
localMemoryManager
.getQueryPool()
.reserve(localFragmentInstanceId.getQueryId(), bytesToReserve);
- if (future.isDone()) {
- endSequenceId += 1;
- reservedBytes += bytesToReserve;
- bufferRetainedSizeInBytes += bytesToReserve;
- } else {
+ bufferRetainedSizeInBytes += bytesToReserve;
+ endSequenceId += 1;
+ reservedBytes += bytesToReserve;
+ if (!future.isDone()) {
break;
}
}
@@ -154,48 +164,27 @@ public class SourceHandle implements ISourceHandle {
return;
}
- if (future.isDone()) {
- nextSequenceId = endSequenceId;
- executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
- } else {
- nextSequenceId = endSequenceId + 1;
+ nextSequenceId = endSequenceId;
+ executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
+ if (!future.isDone()) {
+ blockedOnMemory = future;
// The future being not completed indicates,
// 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
- // 2. Memory reservation for block whose sequence ID equals endSequenceId is blocked.
+ // 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
// 3. Have not reserve memory for the rest of blocks.
//
- // startSequenceId endSequenceId endSequenceId + 1
+ // startSequenceId endSequenceId - 1 endSequenceId
// |-------- reserved --------|--- blocked ---|--- not reserved ---|
- if (endSequenceId > startSequenceId) {
- // Memory has been reserved. Submit a GetDataBlocksTask for these blocks.
- executorService.submit(
- new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
- }
-
- // Submit a GetDataBlocksTask when memory is freed.
- final int sequenceIdOfUnReservedDataBlock = endSequenceId;
- final long sizeOfUnReservedDataBlock = sequenceIdToDataBlockSize.get(endSequenceId);
- future.addListener(
- () -> {
- executorService.submit(
- new GetDataBlocksTask(
- sequenceIdOfUnReservedDataBlock,
- sequenceIdOfUnReservedDataBlock + 1,
- sizeOfUnReservedDataBlock));
- bufferRetainedSizeInBytes += sizeOfUnReservedDataBlock;
- },
- executorService);
-
// Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
future.addListener(SourceHandle.this::trySubmitGetDataBlocksTask, executorService);
}
}
@Override
- public ListenableFuture<Void> isBlocked() {
- if (closed) {
- throw new IllegalStateException("Source handle is closed.");
+ public synchronized ListenableFuture<Void> isBlocked() {
+ if (aborted) {
+ throw new IllegalStateException("Source handle is aborted.");
}
return nonCancellationPropagating(blocked);
}
@@ -205,6 +194,9 @@ public class SourceHandle implements ISourceHandle {
if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
blocked.set(null);
}
+ if (isFinished()) {
+ sourceHandleListener.onFinished(this);
+ }
}
synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) {
@@ -215,13 +207,14 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public synchronized void close() {
- if (closed) {
+ public synchronized void abort() {
+ if (aborted) {
return;
}
if (blocked != null && !blocked.isDone()) {
blocked.cancel(true);
}
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
@@ -229,8 +222,8 @@ public class SourceHandle implements ISourceHandle {
.free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
- closed = true;
- sourceHandleListener.onClosed(this);
+ aborted = true;
+ sourceHandleListener.onAborted(this);
}
@Override
@@ -267,8 +260,8 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public boolean isClosed() {
- return closed;
+ public boolean isAborted() {
+ return aborted;
}
@Override
@@ -335,7 +328,7 @@ public class SourceHandle implements ISourceHandle {
tsBlocks.add(tsBlock);
}
synchronized (SourceHandle.this) {
- if (closed) {
+ if (aborted) {
return;
}
for (int i = startSequenceId; i < endSequenceId; i++) {
@@ -372,7 +365,6 @@ public class SourceHandle implements ISourceHandle {
}
}
}
- // TODO: try to issue another GetDataBlocksTask to make the query run faster.
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
index ae220e4911..41da1197a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -335,7 +335,7 @@ public abstract class Driver implements IDriver {
try {
root.close();
- sinkHandle.close();
+ sinkHandle.setNoMoreTsBlocks();
} catch (InterruptedException t) {
// don't record the stack
wasInterrupted = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9a254de2d7..cc5565c0eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -206,7 +206,7 @@ public class QueryExecution implements IQueryExecution {
// 1. The client fetch all the result and the ResultHandle is finished.
// 2. The client's connection is closed that all owned QueryExecution should be cleaned up
if (resultHandle != null && resultHandle.isFinished()) {
- resultHandle.close();
+ resultHandle.abort();
}
}
@@ -220,7 +220,7 @@ public class QueryExecution implements IQueryExecution {
@Override
public TsBlock getBatchResult() {
try {
- if (resultHandle.isClosed() || resultHandle.isFinished()) {
+ if (resultHandle.isAborted() || resultHandle.isFinished()) {
return null;
}
ListenableFuture<Void> blocked = resultHandle.isBlocked();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
index f634b71a66..14950ed94c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
@@ -35,7 +35,7 @@ import java.util.Queue;
/** A thread-safe memory pool. */
public class MemoryPool {
- private static class MemoryReservationFuture<V> extends AbstractFuture<V> {
+ public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
private final String queryId;
private final long bytes;
@@ -133,6 +133,26 @@ public class MemoryPool {
return true;
}
+ /**
+ * Cancel the specified memory reservation. If the reservation has finished, do nothing.
+ *
+ * @param future The future returned from {@link #reserve(String, long)}
+ * @return If the future has not complete, return the number of bytes being reserved. Otherwise,
+ * return 0.
+ */
+ public synchronized long tryCancel(ListenableFuture<Void> future) {
+ if (future.isDone()) {
+ return 0L;
+ }
+
+ Validate.notNull(future);
+ Validate.isTrue(
+ future instanceof MemoryReservationFuture,
+ "invalid future type " + future.getClass().getSimpleName());
+ future.cancel(true);
+ return ((MemoryReservationFuture<Void>) future).getBytes();
+ }
+
public synchronized void free(String queryId, long bytes) {
Validate.notNull(queryId);
Validate.isTrue(bytes > 0L);
@@ -155,12 +175,9 @@ public class MemoryPool {
Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
while (iterator.hasNext()) {
MemoryReservationFuture<Void> future = iterator.next();
-
if (future.isCancelled()) {
- iterator.remove();
continue;
}
-
long bytesToReserve = future.getBytes();
if (maxBytes - reservedBytes < bytesToReserve) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 26d4250acc..9a0b115e03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -81,6 +81,6 @@ public class ExchangeOperator implements SourceOperator {
@Override
public void close() throws Exception {
- sourceHandle.close();
+ sourceHandle.abort();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 8ac22daac7..376dff6dcb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -95,16 +95,15 @@ public class SinkHandleTest {
mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
sinkHandle.send(mockTsBlocks);
- sinkHandle.setNoMoreTsBlocks();
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -138,20 +137,23 @@ public class SinkHandleTest {
}
Assert.assertFalse(sinkHandle.isFinished());
+ // Set no-more-tsblocks.
+ sinkHandle.setNoMoreTsBlocks();
+ Assert.assertTrue(sinkHandle.isFull().isDone());
+ Assert.assertFalse(sinkHandle.isFinished());
+ Assert.assertFalse(sinkHandle.isAborted());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(sinkHandle);
+
// Ack tsblocks.
sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertTrue(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(1))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFinish(sinkHandle);
- // Close the SinkHandle.
- sinkHandle.close();
- Assert.assertTrue(sinkHandle.isClosed());
- Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
try {
Thread.sleep(100L);
Mockito.verify(mockClient, Mockito.times(1))
@@ -222,7 +224,7 @@ public class SinkHandleTest {
mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
@@ -230,7 +232,7 @@ public class SinkHandleTest {
sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -268,7 +270,7 @@ public class SinkHandleTest {
sinkHandle.acknowledgeTsBlock(0, numOfMockTsBlock);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(1))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
@@ -277,7 +279,7 @@ public class SinkHandleTest {
sinkHandle.send(mockTsBlocks);
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -299,12 +301,11 @@ public class SinkHandleTest {
Assert.fail();
}
- // Close the SinkHandle.
+ // Set no-more-tsblocks.
sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFinished());
- sinkHandle.close();
- Assert.assertTrue(sinkHandle.isClosed());
- Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onClosed(sinkHandle);
+ Assert.assertFalse(sinkHandle.isAborted());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onEndOfBlocks(sinkHandle);
try {
Thread.sleep(100L);
Mockito.verify(mockClient, Mockito.times(1))
@@ -320,7 +321,7 @@ public class SinkHandleTest {
Assert.fail();
}
- // Get tsblocks after the SinkHandle is closed.
+ // Get tsblocks after no-more-tsblocks is set.
for (int i = numOfMockTsBlock; i < numOfMockTsBlock * 2; i++) {
try {
sinkHandle.getSerializedTsBlock(i);
@@ -334,7 +335,7 @@ public class SinkHandleTest {
// Ack tsblocks.
sinkHandle.acknowledgeTsBlock(numOfMockTsBlock, numOfMockTsBlock * 2);
Assert.assertTrue(sinkHandle.isFinished());
- Assert.assertTrue(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(mockMemoryPool, Mockito.times(2))
.free(queryId, numOfMockTsBlock * mockTsBlockSize);
@@ -395,16 +396,15 @@ public class SinkHandleTest {
mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
sinkHandle.send(mockTsBlocks);
- sinkHandle.setNoMoreTsBlocks();
Assert.assertFalse(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
@@ -425,22 +425,21 @@ public class SinkHandleTest {
e.printStackTrace();
Assert.fail();
}
-
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(sinkHandle, mockException);
// Close the SinkHandle.
try {
- sinkHandle.close();
+ sinkHandle.setNoMoreTsBlocks();
Assert.fail("Expect an RuntimeException.");
} catch (RuntimeException e) {
Assert.assertEquals("Send EndOfDataBlockEvent failed", e.getMessage());
}
- Assert.assertFalse(sinkHandle.isClosed());
- Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onClosed(sinkHandle);
+ Assert.assertFalse(sinkHandle.isAborted());
+ Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onEndOfBlocks(sinkHandle);
// Abort the SinkHandle.
sinkHandle.abort();
- Assert.assertTrue(sinkHandle.isClosed());
+ Assert.assertTrue(sinkHandle.isAborted());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
Mockito.verify(mockSinkHandleListener, Mockito.times(0)).onFinish(sinkHandle);
}
@@ -459,9 +458,9 @@ public class SinkHandleTest {
// Construct a mock LocalMemoryManager that returns blocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
- MemoryPool mockMemoryPool =
- Utils.createMockBlockedMemoryPool(queryId, numOfMockTsBlock, mockTsBlockSize);
- Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+ MemoryPool spyMemoryPool =
+ Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 10 * mockTsBlockSize));
+ Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
// Construct a mock SinkHandleListener.
SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
@@ -499,28 +498,30 @@ public class SinkHandleTest {
mockClientManager);
Assert.assertTrue(sinkHandle.isFull().isDone());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
// Send tsblocks.
sinkHandle.send(mockTsBlocks);
+ sinkHandle.send(mockTsBlocks);
Future<?> blocked = sinkHandle.isFull();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertFalse(sinkHandle.isClosed());
+ Assert.assertFalse(sinkHandle.isAborted());
Assert.assertEquals(
- mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
- Assert.assertEquals(numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
+ 2 * mockTsBlockSize * numOfMockTsBlock, sinkHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(2 * numOfMockTsBlock, sinkHandle.getNumOfBufferedTsBlocks());
sinkHandle.abort();
Assert.assertTrue(blocked.isDone());
Assert.assertTrue(blocked.isCancelled());
Assert.assertFalse(sinkHandle.isFinished());
- Assert.assertTrue(sinkHandle.isClosed());
+ Assert.assertTrue(sinkHandle.isAborted());
Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
Assert.assertEquals(0, sinkHandle.getNumOfBufferedTsBlocks());
Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onAborted(sinkHandle);
+ Assert.assertEquals(0L, spyMemoryPool.getQueryMemoryReservedBytes(queryId));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 87d76463db..c8c6c00329 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -103,7 +103,7 @@ public class SourceHandleTest {
mockSourceHandleListener,
mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -134,22 +134,20 @@ public class SourceHandleTest {
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
-
sourceHandle.receive();
-
if (i < numOfMockTsBlock - 1) {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -159,14 +157,10 @@ public class SourceHandleTest {
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
- Assert.assertTrue(sourceHandle.isFinished());
- Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
- sourceHandle.close();
- Assert.assertTrue(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
}
@Test
@@ -226,7 +220,7 @@ public class SourceHandleTest {
mockSourceHandleListener,
mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -245,22 +239,22 @@ public class SourceHandleTest {
req ->
remoteFragmentInstanceId.equals(req.getSourceFragmentInstanceId())
&& 0 == req.getStartSequenceId()
- && 5 == req.getEndSequenceId()));
+ && 6 == req.getEndSequenceId()));
Mockito.verify(mockClient, Mockito.times(1))
.onAcknowledgeDataBlockEvent(
Mockito.argThat(
e ->
remoteFragmentInstanceId.equals(e.getSourceFragmentInstanceId())
&& 0 == e.getStartSequenceId()
- && 5 == e.getEndSequenceId()));
+ && 6 == e.getEndSequenceId()));
} catch (InterruptedException | TException e) {
e.printStackTrace();
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
- Assert.assertEquals(5 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(6 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -268,9 +262,9 @@ public class SourceHandleTest {
sourceHandle.receive();
try {
Thread.sleep(100L);
- if (i < 5) {
- Assert.assertEquals(5 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
- final int startSequenceId = 5 + i;
+ if (i < 4) {
+ Assert.assertEquals(6 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
+ final int startSequenceId = 6 + i;
Mockito.verify(mockClient, Mockito.times(1))
.getDataBlock(
Mockito.argThat(
@@ -299,20 +293,17 @@ public class SourceHandleTest {
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
}
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
- Assert.assertTrue(sourceHandle.isFinished());
- Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
- sourceHandle.close();
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
}
@Test
@@ -371,7 +362,7 @@ public class SourceHandleTest {
mockSourceHandleListener,
mockClientManager);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -392,7 +383,7 @@ public class SourceHandleTest {
Assert.fail();
}
Assert.assertFalse(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -422,7 +413,7 @@ public class SourceHandleTest {
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * 2 * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
@@ -435,7 +426,7 @@ public class SourceHandleTest {
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(2 * numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -469,7 +460,7 @@ public class SourceHandleTest {
Assert.fail();
}
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
numOfMockTsBlock * mockTsBlockSize, sourceHandle.getBufferRetainedSizeInBytes());
@@ -482,7 +473,7 @@ public class SourceHandleTest {
} else {
Assert.assertFalse(sourceHandle.isBlocked().isDone());
}
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
(numOfMockTsBlock - 1 - i) * mockTsBlockSize,
@@ -492,14 +483,10 @@ public class SourceHandleTest {
// Receive EndOfDataBlock event from upstream fragment instance.
sourceHandle.setNoMoreTsBlocks(3 * numOfMockTsBlock - 1);
Assert.assertTrue(sourceHandle.isBlocked().isDone());
- Assert.assertFalse(sourceHandle.isClosed());
- Assert.assertTrue(sourceHandle.isFinished());
- Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
-
- sourceHandle.close();
- Assert.assertTrue(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertTrue(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onFinished(sourceHandle);
}
@Test
@@ -551,11 +538,10 @@ public class SourceHandleTest {
mockClientManager);
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
- // Receive data blocks from upstream fragment instance.
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
@@ -575,10 +561,12 @@ public class SourceHandleTest {
.onFailure(sourceHandle, mockException);
Assert.assertFalse(blocked.isDone());
- sourceHandle.close();
+ sourceHandle.abort();
Assert.assertFalse(sourceHandle.isFinished());
+ Assert.assertTrue(sourceHandle.isAborted());
Assert.assertTrue(blocked.isDone());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(sourceHandle);
}
@Test
@@ -638,16 +626,16 @@ public class SourceHandleTest {
Future<?> blocked = sourceHandle.isBlocked();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(blocked.isCancelled());
- Assert.assertFalse(sourceHandle.isClosed());
+ Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
- sourceHandle.close();
+ sourceHandle.abort();
Assert.assertTrue(blocked.isDone());
Assert.assertTrue(blocked.isCancelled());
- Assert.assertTrue(sourceHandle.isClosed());
+ Assert.assertTrue(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
- Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onClosed(sourceHandle);
+ Mockito.verify(mockSourceHandleListener, Mockito.times(1)).onAborted(sourceHandle);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index ebd863d738..a98152ba24 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -69,10 +69,16 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public void setNoMoreTsBlocks() {}
+ public void setNoMoreTsBlocks() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ instanceContext.transitionToFlushing();
+ }
@Override
- public boolean isClosed() {
+ public boolean isAborted() {
return closed;
}
@@ -81,17 +87,9 @@ public class StubSinkHandle implements ISinkHandle {
return false;
}
- @Override
- public void close() {
- if (closed) {
- return;
- }
- closed = true;
- instanceContext.transitionToFlushing();
- }
-
@Override
public void abort() {
+ closed = true;
tsBlocks.clear();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
index c1ca224868..fd4ed074d7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
@@ -247,4 +247,29 @@ public class MemoryPoolTest {
} catch (IllegalArgumentException ignore) {
}
}
+
+ @Test
+ public void testTryCancelBlockedReservation() {
+ String queryId = "q0";
+ // Run out of memory.
+ Assert.assertTrue(pool.tryReserve(queryId, 512L));
+
+ ListenableFuture<Void> f = pool.reserve(queryId, 256L);
+ Assert.assertFalse(f.isDone());
+ // Cancel the reservation.
+ Assert.assertEquals(256L, pool.tryCancel(f));
+ Assert.assertTrue(f.isDone());
+ Assert.assertTrue(f.isCancelled());
+ }
+
+ @Test
+ public void testTryCancelCompletedReservation() {
+ String queryId = "q0";
+ ListenableFuture<Void> f = pool.reserve(queryId, 256L);
+ Assert.assertTrue(f.isDone());
+ // Cancel the reservation.
+ Assert.assertEquals(0L, pool.tryCancel(f));
+ Assert.assertTrue(f.isDone());
+ Assert.assertFalse(f.isCancelled());
+ }
}