You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/23 07:29:42 UTC
[iotdb] 01/01: allow following pipeline run in advance
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch deviceView
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cb90e36d96b88132768d6c47647bc526935f58c3
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Feb 23 15:29:26 2023 +0800
allow following pipeline run in advance
---
.../db/mpp/execution/exchange/MPPDataExchangeManager.java | 1 +
.../iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java | 13 +++++++++++++
2 files changed, 14 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index f51c4b77cb..2bf11da42f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -474,6 +474,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager);
+ queue.allowAddingTsBlock();
return new LocalSinkHandle(
queue,
new PipelineSinkHandleListenerImpl(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index a40202de46..8c74d8027d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -111,6 +111,13 @@ public class SharedTsBlockQueue {
return maxBytesCanReserve;
}
+ /** Allow adding data to queue manually. */
+ public void allowAddingTsBlock() {
+ if (!canAddTsBlock.isDone()) {
+ canAddTsBlock.set(null);
+ }
+ }
+
public ListenableFuture<Void> isBlocked() {
if (!canAddTsBlock.isDone()) {
canAddTsBlock.set(null);
@@ -155,6 +162,7 @@ public class SharedTsBlockQueue {
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
+ logger.info("PlanNode{} remove one TsBlock", localPlanNodeId);
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// corresponding LocalSinkHandle.
if (sinkHandle != null) {
@@ -200,6 +208,10 @@ public class SharedTsBlockQueue {
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
+ logger.info(
+ "PlanNode{} add one TsBlock failed because of full memory. Current size of queue is {}.",
+ localPlanNodeId,
+ queue.size());
blockedOnMemory.addListener(
() -> {
synchronized (this) {
@@ -212,6 +224,7 @@ public class SharedTsBlockQueue {
directExecutor());
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
+ logger.info("PlanNode{} add one TsBlock successfully", localPlanNodeId);
if (!blocked.isDone()) {
blocked.set(null);
}