You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/11 13:21:36 UTC

[iotdb] branch SinkChannelNPE created (now ca138649dd)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch SinkChannelNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ca138649dd Fix potential NPE in SinkChannel

This branch includes the following new commits:

     new ca138649dd Fix potential NPE in SinkChannel

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix potential NPE in SinkChannel

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SinkChannelNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca138649dd390c2eb1dd3e28c25f29c255fc501a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 11 21:21:08 2023 +0800

    Fix potential NPE in SinkChannel
---
 .../mpp/execution/exchange/sink/ShuffleSinkHandle.java |  4 ++--
 .../db/mpp/execution/exchange/sink/SinkChannel.java    | 18 +++++++++++++-----
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void abort() {
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void close() {
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b32028bee1..1b027ba2ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void abort() {
     LOGGER.debug("[StartAbortSinkChannel]");
-    if (aborted) {
+    if (aborted || closed) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
   @Override
   public synchronized void close() {
     LOGGER.debug("[StartCloseSinkChannel]");
-    if (closed) {
+    if (closed || aborted) {
       return;
     }
     sequenceIdToTsBlock.clear();
-    bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+    if (blocked != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+    }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
           .getQueryPool()
@@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel {
 
   // region ============ ISinkChannel related ============
 
-  public void open() {
+  @Override
+  public synchronized void open() {
+    if (aborted || closed) {
+      return;
+    }
     // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel
     this.blocked =
         localMemoryManager