You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/24 01:26:36 UTC

[iotdb] branch master updated: [IOTDB-4005] Allow the following child pipeline to run in advance

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8238428d3c [IOTDB-4005] Allow the following child pipeline to run in advance
8238428d3c is described below

commit 8238428d3cc9dedfb036c70a4775e7eb7e16eb8f
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Fri Feb 24 09:26:31 2023 +0800

    [IOTDB-4005] Allow the following child pipeline to run in advance
---
 .../iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java    | 1 +
 .../apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 7 +++++++
 2 files changed, 8 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..2bf11da42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -474,6 +474,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager);
+    queue.allowAddingTsBlock();
     return new LocalSinkHandle(
         queue,
         new PipelineSinkHandleListenerImpl(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..1574d8be3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -111,6 +111,13 @@ public class SharedTsBlockQueue {
     return maxBytesCanReserve;
   }
 
+  /** Allow adding data to queue manually. */
+  public void allowAddingTsBlock() {
+    if (!canAddTsBlock.isDone()) {
+      canAddTsBlock.set(null);
+    }
+  }
+
   public ListenableFuture<Void> isBlocked() {
     if (!canAddTsBlock.isDone()) {
       canAddTsBlock.set(null);