You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/23 07:29:41 UTC

[iotdb] branch deviceView created (now cb90e36d96)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch deviceView
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at cb90e36d96 allow following pipeline run in advance

This branch includes the following new commits:

     new cb90e36d96 allow following pipeline run in advance

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: allow following pipeline run in advance

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deviceView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cb90e36d96b88132768d6c47647bc526935f58c3
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 23 15:29:26 2023 +0800

    allow following pipeline run in advance
---
 .../db/mpp/execution/exchange/MPPDataExchangeManager.java   |  1 +
 .../iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 13 +++++++++++++
 2 files changed, 14 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..2bf11da42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -474,6 +474,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager);
+    queue.allowAddingTsBlock();
     return new LocalSinkHandle(
         queue,
         new PipelineSinkHandleListenerImpl(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..8c74d8027d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -111,6 +111,13 @@ public class SharedTsBlockQueue {
     return maxBytesCanReserve;
   }
 
+  /** Allow adding data to queue manually. */
+  public void allowAddingTsBlock() {
+    if (!canAddTsBlock.isDone()) {
+      canAddTsBlock.set(null);
+    }
+  }
+
   public ListenableFuture<Void> isBlocked() {
     if (!canAddTsBlock.isDone()) {
       canAddTsBlock.set(null);
@@ -155,6 +162,7 @@ public class SharedTsBlockQueue {
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
+    logger.info("PlanNode{} remove one TsBlock", localPlanNodeId);
     // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
     // corresponding LocalSinkHandle.
     if (sinkHandle != null) {
@@ -200,6 +208,10 @@ public class SharedTsBlockQueue {
 
     // reserve memory failed, we should wait until there is enough memory
     if (!pair.right) {
+      logger.info(
+          "PlanNode{} add one TsBlock failed because of full memory. Current size of queue is {}.",
+          localPlanNodeId,
+          queue.size());
       blockedOnMemory.addListener(
           () -> {
             synchronized (this) {
@@ -212,6 +224,7 @@ public class SharedTsBlockQueue {
           directExecutor());
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
+      logger.info("PlanNode{} add one TsBlock successfully", localPlanNodeId);
       if (!blocked.isDone()) {
         blocked.set(null);
       }