You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/24 09:20:57 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] remove check of closed state in ISink

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 818f8e3738 [To rel/1.1] remove check of closed state in ISink
818f8e3738 is described below

commit 818f8e373859ac93bc5b6f80eab399f2b2304b46
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 24 17:20:51 2023 +0800

    [To rel/1.1] remove check of closed state in ISink
---
 .../iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java     | 7 +++++++
 .../iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java    | 7 ++++---
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index 0e434aae80..d24c669c5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
 
@@ -83,6 +84,9 @@ public class LocalSinkChannel implements ISinkChannel {
   @Override
   public synchronized ListenableFuture<?> isFull() {
     checkState();
+    if (closed) {
+      return immediateVoidFuture();
+    }
     return nonCancellationPropagating(blocked);
   }
 
@@ -115,6 +119,9 @@ public class LocalSinkChannel implements ISinkChannel {
       Validate.notNull(tsBlock, "tsBlocks is null");
       synchronized (this) {
         checkState();
+        if (closed) {
+          return;
+        }
         if (!blocked.isDone()) {
           throw new IllegalStateException("Sink handle is blocked.");
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index b1e155b22f..11c69a2ec3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -114,9 +114,12 @@ public class ShuffleSinkHandle implements ISinkHandle {
   public synchronized void send(TsBlock tsBlock) {
     long startTime = System.nanoTime();
     try {
+      checkState();
+      if (closed) {
+        return;
+      }
       ISinkChannel currentChannel =
           downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
-      checkState();
       currentChannel.send(tsBlock);
     } finally {
       switchChannelIfNecessary();
@@ -248,8 +251,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
   private void checkState() {
     if (aborted) {
       throw new IllegalStateException("ShuffleSinkHandle is aborted.");
-    } else if (closed) {
-      throw new IllegalStateException("ShuffleSinkHandle is closed.");
     }
   }