You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/22 08:14:34 UTC

[iotdb] branch LocalSourceHandleNPE created (now 08b454b028)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LocalSourceHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 08b454b028 [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle

This branch includes the following new commits:

     new 08b454b028 [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LocalSourceHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 08b454b028006197cf8973b1a707a9ed0bffed69
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 22 16:14:13 2022 +0800

    [IOTDB-4199] Fix NPE in LocalSourceHandle and memory leak in SourceHandle
---
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 38 +++++++++++++++++-----
 .../db/mpp/execution/exchange/SourceHandle.java    | 10 +++---
 2 files changed, 34 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index aacffe4796..1222ee5ef0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -34,6 +35,9 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.util.LinkedList;
 import java.util.Queue;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
 /** This is not thread safe class, the caller should ensure multi-threads safety. */
 @NotThreadSafe
 public class SharedTsBlockQueue {
@@ -94,7 +98,8 @@ public class SharedTsBlockQueue {
   public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
     logger.info("SharedTsBlockQueue receive no more TsBlocks signal.");
     if (closed) {
-      throw new IllegalStateException("queue has been destroyed");
+      logger.warn("queue has been destroyed");
+      return;
     }
     this.noMoreTsBlocks = noMoreTsBlocks;
     if (!blocked.isDone()) {
@@ -135,21 +140,38 @@ public class SharedTsBlockQueue {
    */
   public ListenableFuture<Void> add(TsBlock tsBlock) {
     if (closed) {
-      throw new IllegalStateException("queue has been destroyed");
+      logger.warn("queue has been destroyed");
+      return immediateVoidFuture();
     }
 
     Validate.notNull(tsBlock, "TsBlock cannot be null");
     Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
-    blockedOnMemory =
+    Pair<ListenableFuture<Void>, Boolean> pair =
         localMemoryManager
             .getQueryPool()
-            .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes())
-            .left;
+            .reserve(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+    blockedOnMemory = pair.left;
     bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
-    queue.add(tsBlock);
-    if (!blocked.isDone()) {
-      blocked.set(null);
+
+    // reserve memory failed, we should wait until there is enough memory
+    if (!pair.right) {
+      blockedOnMemory.addListener(
+          () -> {
+            synchronized (this) {
+              queue.add(tsBlock);
+              if (!blocked.isDone()) {
+                blocked.set(null);
+              }
+            }
+          },
+          directExecutor());
+    } else { // reserve memory succeeded, add the TsBlock directly
+      queue.add(tsBlock);
+      if (!blocked.isDone()) {
+        blocked.set(null);
+      }
     }
+
     return blockedOnMemory;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 1e4fa305e4..c0a353f8b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -129,13 +129,11 @@ public class SourceHandle implements ISourceHandle {
       if (tsBlock == null) {
         return null;
       }
-      logger.info(
-          "Receive {} TsBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes());
+      long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      logger.info("Receive {} TsBlock, size is {}", currSequenceId, retainedSize);
       currSequenceId += 1;
-      bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
-      localMemoryManager
-          .getQueryPool()
-          .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes());
+      bufferRetainedSizeInBytes -= retainedSize;
+      localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), retainedSize);
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         logger.info("no buffered TsBlock, blocked");