You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/07 10:35:22 UTC

[iotdb] branch master updated: Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba46d351ed5 Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)
ba46d351ed5 is described below

commit ba46d351ed5bbdee65873b95bbf2687967bc6ba1
Author: 马子坤 <55...@users.noreply.github.com>
AuthorDate: Thu Sep 7 18:35:16 2023 +0800

    Pipe: include creation time in PipeProcessorSubtask#taskID to avoid task scheduling issues after 1000+ pipes' creating and dropping (#11078)
    
    How to reproduce:
    
    * create 1000 devices and write some data
    * create and start 1000 pipes, their patterns matching the devices
    * drop all pipes
    * delete all data on receiver side
    * create and start the same pipes again
    * execute queries on receiver side, and the data are NOT transferred
---
 .../apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 894e7d14990..ef7997cb443 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -85,7 +85,12 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       throw new PipeException(e.getMessage(), e);
     }
 
-    final String taskId = pipeName + "_" + dataRegionId;
+    // Should add creation time in taskID, because subtasks are stored in the hashmap
+    // PipeProcessorSubtaskWorker.subtasks, and deleted subtasks will be removed by
+    // a timed thread. If a pipe is deleted and created again before its subtask is
+    // removed, the new subtask will have the same pipeName and dataRegionId as the
+    // old one, so we need creationTime to make their hash code different in the map.
+    final String taskId = pipeName + "_" + dataRegionId + "_" + creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(pipeConnectorOutputPendingQueue);
     this.pipeProcessorSubtask =