You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/20 03:26:24 UTC
[iotdb] branch master updated: [IOTDB-5798] Fix concurrent problem when sinkChannel acknowledgeTsBlock() and close()
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f13699611f [IOTDB-5798] Fix concurrent problem when sinkChannel acknowledgeTsBlock() and close()
f13699611f is described below
commit f13699611f9f11ee1d368b652f5e8d29a9c25995
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Apr 20 11:26:18 2023 +0800
[IOTDB-5798] Fix concurrent problem when sinkChannel acknowledgeTsBlock() and close()
---
.../mpp/execution/exchange/sink/SinkChannel.java | 23 +++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 9c8f8449fe..36bb0b3d1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -329,21 +329,22 @@ public class SinkChannel implements ISinkChannel {
iterator.remove();
LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
}
+
+ // there may exist duplicate ack message in network caused by caller retrying, if so duplicate
+ // ack message's freedBytes may be zero
+ if (freedBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ freedBytes);
+ }
}
if (isFinished()) {
sinkListener.onFinish(this);
}
- // there may exist duplicate ack message in network caused by caller retrying, if so duplicate
- // ack message's freedBytes may be zero
- if (freedBytes > 0) {
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- freedBytes);
- }
}
@Override