You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/23 07:29:42 UTC

[iotdb] 01/01: allow following pipeline run in advance

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deviceView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cb90e36d96b88132768d6c47647bc526935f58c3
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 23 15:29:26 2023 +0800

    allow following pipeline run in advance
---
 .../db/mpp/execution/exchange/MPPDataExchangeManager.java   |  1 +
 .../iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 13 +++++++++++++
 2 files changed, 14 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..2bf11da42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -474,6 +474,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager);
+    queue.allowAddingTsBlock();
     return new LocalSinkHandle(
         queue,
         new PipelineSinkHandleListenerImpl(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..8c74d8027d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -111,6 +111,13 @@ public class SharedTsBlockQueue {
     return maxBytesCanReserve;
   }
 
+  /** Allow adding data to queue manually. */
+  public void allowAddingTsBlock() {
+    if (!canAddTsBlock.isDone()) {
+      canAddTsBlock.set(null);
+    }
+  }
+
   public ListenableFuture<Void> isBlocked() {
     if (!canAddTsBlock.isDone()) {
       canAddTsBlock.set(null);
@@ -155,6 +162,7 @@ public class SharedTsBlockQueue {
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
+    logger.info("PlanNode{} remove one TsBlock", localPlanNodeId);
     // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
     // corresponding LocalSinkHandle.
     if (sinkHandle != null) {
@@ -200,6 +208,10 @@ public class SharedTsBlockQueue {
 
     // reserve memory failed, we should wait until there is enough memory
     if (!pair.right) {
+      logger.info(
+          "PlanNode{} add one TsBlock failed because of full memory. Current size of queue is {}.",
+          localPlanNodeId,
+          queue.size());
       blockedOnMemory.addListener(
           () -> {
             synchronized (this) {
@@ -212,6 +224,7 @@ public class SharedTsBlockQueue {
           directExecutor());
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
+      logger.info("PlanNode{} add one TsBlock successfully", localPlanNodeId);
       if (!blocked.isDone()) {
         blocked.set(null);
       }