You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/24 09:20:57 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] remove check of closed state in ISink
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 818f8e3738 [To rel/1.1] remove check of closed state in ISink
818f8e3738 is described below
commit 818f8e373859ac93bc5b6f80eab399f2b2304b46
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 24 17:20:51 2023 +0800
[To rel/1.1] remove check of closed state in ISink
---
.../iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java | 7 +++++++
.../iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java | 7 ++++---
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index 0e434aae80..d24c669c5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
@@ -83,6 +84,9 @@ public class LocalSinkChannel implements ISinkChannel {
@Override
public synchronized ListenableFuture<?> isFull() {
checkState();
+ if (closed) {
+ return immediateVoidFuture();
+ }
return nonCancellationPropagating(blocked);
}
@@ -115,6 +119,9 @@ public class LocalSinkChannel implements ISinkChannel {
Validate.notNull(tsBlock, "tsBlocks is null");
synchronized (this) {
checkState();
+ if (closed) {
+ return;
+ }
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index b1e155b22f..11c69a2ec3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -114,9 +114,12 @@ public class ShuffleSinkHandle implements ISinkHandle {
public synchronized void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
+ checkState();
+ if (closed) {
+ return;
+ }
ISinkChannel currentChannel =
downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
- checkState();
currentChannel.send(tsBlock);
} finally {
switchChannelIfNecessary();
@@ -248,8 +251,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
private void checkState() {
if (aborted) {
throw new IllegalStateException("ShuffleSinkHandle is aborted.");
- } else if (closed) {
- throw new IllegalStateException("ShuffleSinkHandle is closed.");
}
}