You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/07 10:35:22 UTC
[iotdb] branch master updated: Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba46d351ed5 Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)
ba46d351ed5 is described below
commit ba46d351ed5bbdee65873b95bbf2687967bc6ba1
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Thu Sep 7 18:35:16 2023 +0800
Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)
How to reproduce:
* create 1000 devices and write some data
* create and start 1000 pipes, their patterns matching the devices
* drop all pipes
* delete all data on receiver side
* create and start the same pipes again
* execute queries on receiver side, and the data are NOT transferred
---
.../apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 894e7d14990..ef7997cb443 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -85,7 +85,12 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
throw new PipeException(e.getMessage(), e);
}
- final String taskId = pipeName + "_" + dataRegionId;
+ // Should add creation time in taskID, because subtasks are stored in the hashmap
+ // PipeProcessorSubtaskWorker.subtasks, and deleted subtasks will be removed by
+ // a timed thread. If a pipe is deleted and created again before its subtask is
+ // removed, the new subtask will have the same pipeName and dataRegionId as the
+ // old one, so we need creationTime to make their hash code different in the map.
+ final String taskId = pipeName + "_" + dataRegionId + "_" + creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(pipeConnectorOutputPendingQueue);
this.pipeProcessorSubtask =