You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/11 13:21:37 UTC
[iotdb] 01/01: Fix potential NPE in SinkChannel
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch SinkChannelNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca138649dd390c2eb1dd3e28c25f29c255fc501a
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 11 21:21:08 2023 +0800
Fix potential NPE in SinkChannel
---
.../mpp/execution/exchange/sink/ShuffleSinkHandle.java | 4 ++--
.../db/mpp/execution/exchange/sink/SinkChannel.java | 18 +++++++++++++-----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void abort() {
- if (aborted) {
+ if (aborted || closed) {
return;
}
LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void close() {
- if (closed) {
+ if (closed || aborted) {
return;
}
LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b32028bee1..1b027ba2ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void abort() {
LOGGER.debug("[StartAbortSinkChannel]");
- if (aborted) {
+ if (aborted || closed) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void close() {
LOGGER.debug("[StartCloseSinkChannel]");
- if (closed) {
+ if (closed || aborted) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel {
// region ============ ISinkChannel related ============
- public void open() {
+ @Override
+ public synchronized void open() {
+ if (aborted || closed) {
+ return;
+ }
// SinkChannel is opened when ShuffleSinkHandle choose it as the next channel
this.blocked =
localMemoryManager