You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/11 13:21:37 UTC

[iotdb] 01/01: Fix potential NPE in SinkChannel

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SinkChannelNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca138649dd390c2eb1dd3e28c25f29c255fc501a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 11 21:21:08 2023 +0800

    Fix potential NPE in SinkChannel
---
 .../mpp/execution/exchange/sink/ShuffleSinkHandle.java |  4 ++--
 .../db/mpp/execution/exchange/sink/SinkChannel.java    | 18 +++++++++++++-----
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void abort() {
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void close() {
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b32028bee1..1b027ba2ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void abort() {
     LOGGER.debug("[StartAbortSinkChannel]");
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void close() {
     LOGGER.debug("[StartCloseSinkChannel]");
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel {
 
   // region ============ ISinkChannel related ============
 
-  public void open() {
+  @Override
+  public synchronized void open() {
+    if (aborted || closed) {
+      return;
+    }
     // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel
     this.blocked =
         localMemoryManager