You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 21:16:57 UTC

[iotdb] 03/03: Pipe: Skeleton Code Framework

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c0b21bd79e098c88300c98887bf4dabbc44e66d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Mar 19 05:16:29 2023 +0800

    Pipe: Skeleton Code Framework
---
 .../pipe/task/meta/PipeTaskMetaAccessor.java       |  20 +---
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |   3 +
 .../pipe/agent/{ => plugin}/PipePluginAgent.java   |   5 +-
 .../HeartbeatScheduler.java}                       |  21 +----
 .../MetaSyncScheduler.java}                        |  20 +---
 .../pipe/agent/{ => runtime}/PipeRuntimeAgent.java |   4 +-
 .../db/pipe/agent/{ => task}/PipeTaskAgent.java    |   4 +-
 .../PipeTaskRegionAgent.java}                      |  20 +---
 .../collector/PipeCollectorEventPendingQueue.java} |  20 +---
 .../collector/PipeCollectorEventSelector.java}     |  20 +---
 .../historical/PipeHistoricalCollector.java}       |  20 +---
 .../collector/realtime/PipeRealtimeCollector.java} |  20 +---
 .../realtime/cache/PipeRealtimeEventCache.java}    |  20 +---
 .../realtime/listener/IoTLogListerner.java}        |  20 +---
 .../realtime/listener/RatisLogListener.java}       |  20 +---
 .../realtime/listener/SimpleLogListener.java}      |  20 +---
 .../listener/TsFileGenerationListener.java}        |  20 +---
 .../collector/realtime/matcher/Rule.java}          |  20 +---
 .../realtime/matcher/RulePrefixMatchTree.java}     |  20 +---
 .../collector/realtime/recorder/TsFileEpoch.java}  |  20 +---
 .../realtime/recorder/TsFileEpochRecorder.java}    |  20 +---
 .../connector/PipeConnectorContainer.java}         |  20 +---
 .../connector/PipeConnectorManager.java}           |  20 +---
 .../PipeConnectorPluginRuntimeWrapper.java}        |  19 ++--
 .../event/PipeTabletInsertionEvent.java}           |  30 +++---
 .../event/PipeTsFileInsertionEvent.java}           |  22 ++---
 .../iotdb/db/pipe/core/event/access/PipeRow.java   | 102 +++++++++++++++++++++
 .../db/pipe/core/event/access/PipeRowIterator.java |  60 ++++++++++++
 .../event/collector/PipeEventCollector.java}       |  27 +++---
 .../event/collector/PipeRowCollector.java}         |  21 ++---
 .../event/indexer/PipeEventIndexer.java}           |  20 +---
 .../event/indexer/PipeIoTEventIndexer.java}        |  20 +---
 .../event/indexer/PipeRatisEventIndexer.java}      |  20 +---
 .../event/indexer/PipeSimpleEventIndexer.java}     |  20 +---
 .../event/indexer/PipeTsFileEventIndexer.java}     |  20 +---
 .../PipeProcessorPluginRuntimeWrapper.java}        |  19 ++--
 .../executor/PipeAssignerSubtaskExecutor.java}     |  20 +---
 .../executor/PipeConnectorSubtaskExecutor.java}    |  20 +---
 .../executor/PipeProcessorSubtaskExecutor.java}    |  20 +---
 .../executor/PipeSubtaskExecutor.java}             |  20 +---
 .../pipe/execution/executor/PipeTaskExecutor.java  |  49 ++++++++++
 .../scheduler/PipeAssignerSubtaskScheduler.java}   |  24 +++--
 .../scheduler/PipeConnectorSubtaskScheduler.java}  |  24 +++--
 .../scheduler/PipeProcessorSubtaskScheduler.java}  |  24 +++--
 .../scheduler/PipeSubtaskScheduler.java}           |  21 ++---
 .../execution/scheduler/PipeTaskScheduler.java     |  60 ++++++++++++
 .../PipeFileManager.java}                          |  20 +---
 .../PipeRaftlogHolder.java}                        |  20 +---
 .../PipeTsFileHolder.java}                         |  20 +---
 .../PipeWALHolder.java}                            |  20 +---
 .../PipeRuntimeAgent.java => task/PipeTask.java}   |  34 ++++---
 .../PipeTaskBuilder.java}                          |  21 +----
 .../metrics/PipeTaskRuntimeRecorder.java}          |  20 +---
 .../runnable/PipeAssignerSubtask.java}             |  20 ++--
 .../runnable/PipeConnectorSubtask.java}            |  20 ++--
 .../runnable/PipeProcessorSubtask.java}            |  20 ++--
 .../runnable/PipeSubtask.java}                     |  20 ++--
 .../stage/PipeTaskCollectorStage.java}             |  25 +++--
 .../stage/PipeTaskConnectorStage.java}             |  25 +++--
 .../stage/PipeTaskProcessorStage.java}             |  25 +++--
 .../stage/PipeTaskStage.java}                      |  41 ++++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +-
 62 files changed, 571 insertions(+), 865 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
index 5034fb50e7..04653122c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.commons.pipe.task.meta;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskMetaAccessor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 7e3b3a1e87..1c50fe5b40 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.agent;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
 
 /** PipeAgent is the entry point of the pipe module in DatNode. */
 public class PipeAgent {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 57cb0fb1b3..6831ac6a34 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
@@ -188,7 +188,8 @@ public class PipePluginAgent {
     private static PipePluginAgent instance = null;
   }
 
-  static PipePluginAgent setupAndGetInstance(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+  public static PipePluginAgent setupAndGetInstance(
+      DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
     if (PipePluginAgentServiceHolder.instance == null) {
       PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
index 5034fb50e7..de3f30f388 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
@@ -17,22 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
+public class HeartbeatScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
index 5034fb50e7..366176cd1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class MetaSyncScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index e42b1f66f3..cbfe53be8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
 public class PipeRuntimeAgent {
 
@@ -29,7 +29,7 @@ public class PipeRuntimeAgent {
     private static PipeRuntimeAgent INSTANCE = null;
   }
 
-  static PipeRuntimeAgent setupAndGetInstance() {
+  public static PipeRuntimeAgent setupAndGetInstance() {
     if (PipeRuntimeAgentHolder.INSTANCE == null) {
       PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 5034fb50e7..dd8a6984f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
 
 public class PipeTaskAgent {
 
@@ -29,7 +29,7 @@ public class PipeTaskAgent {
     private static PipeTaskAgent instance = null;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
+  public static PipeTaskAgent setupAndGetInstance() {
     if (PipeTaskAgentHolder.instance == null) {
       PipeTaskAgentHolder.instance = new PipeTaskAgent();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
index 5034fb50e7..7be285ef66 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskRegionAgent {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
index 5034fb50e7..c3bfb68015 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeCollectorEventPendingQueue {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
index 5034fb50e7..e1c50a9601 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeCollectorEventSelector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
index 5034fb50e7..6e680de847 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.historical;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeHistoricalCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
index 5034fb50e7..b5fad778b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRealtimeCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
index 5034fb50e7..7525e06b54 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRealtimeEventCache {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
index 5034fb50e7..b91334430d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class IoTLogListerner {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
index 5034fb50e7..3aa42354f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class RatisLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
index 5034fb50e7..b5e1eaf93b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class SimpleLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
index 5034fb50e7..4c28795219 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileGenerationListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
index 5034fb50e7..948fab2acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class Rule {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
index 5034fb50e7..a543d7f0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class RulePrefixMatchTree {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
index 5034fb50e7..a85d3a0b70 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileEpoch {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
index 5034fb50e7..6948cef200 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileEpochRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
index 5034fb50e7..8651ed3df1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorContainer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
index 5034fb50e7..881edd67f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
index 5034fb50e7..c1c5be1166 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeConnector;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeConnectorPluginRuntimeWrapper {
 
-  private PipeTaskAgent() {}
+  private final PipeConnector pipeConnector;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) {
+    this.pipeConnector = pipeConnector;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
index e42b1f66f3..a58e8db497 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
@@ -17,22 +17,30 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.util.Iterator;
+import java.util.function.BiConsumer;
 
-  private PipeRuntimeAgent() {}
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
+  @Override
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
+    return null;
   }
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
+  @Override
+  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) {
+    return null;
+  }
+
+  @Override
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
index 5034fb50e7..d11b4f780d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
 
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  @Override
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    return null;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  @Override
+  public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
new file mode 100644
index 0000000000..43b438445f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRow implements Row {
+
+  @Override
+  public long getTime() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws IOException {
+    return false;
+  }
+
+  @Override
+  public Binary getBinary(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public Type getDataType(int columnIndex) {
+    return null;
+  }
+
+  @Override
+  public boolean isNull(int columnIndex) {
+    return false;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
new file mode 100644
index 0000000000..960214bea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.access.RowIterator;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRowIterator implements RowIterator {
+
+  @Override
+  public boolean hasNextRow() {
+    return false;
+  }
+
+  @Override
+  public Row next() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void reset() {}
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
index e42b1f66f3..2c2bbfd38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
@@ -17,22 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.io.IOException;
 
-  private PipeRuntimeAgent() {}
+public class PipeEventCollector implements EventCollector {
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  @Override
+  public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
-  }
+  @Override
+  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+
+  @Override
+  public void collectDeletionEvent(DeletionEvent event) throws IOException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
index 5034fb50e7..525e79c137 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.io.IOException;
 
-  private PipeTaskAgent() {}
+public class PipeRowCollector implements RowCollector {
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void collectRow(Row row) throws IOException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
index 5034fb50e7..22cbce9e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public interface PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
index 5034fb50e7..63aff8a519 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeIoTEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
index 5034fb50e7..4520855085 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRatisEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
index 5034fb50e7..d5add45547 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeSimpleEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
index 5034fb50e7..be53e89751 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
index 5034fb50e7..3153d2ae0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.processor;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeProcessor;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeProcessorPluginRuntimeWrapper {
 
-  private PipeTaskAgent() {}
+  private final PipeProcessor pipeProcessor;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) {
+    this.pipeProcessor = pipeProcessor;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
index 5034fb50e7..cc3dd43987 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
index 5034fb50e7..98eaf31d1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
index 5034fb50e7..c61871fafe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 5034fb50e7..7d97605dff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public interface PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
new file mode 100644
index 0000000000..4437fb119d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+/**
+ * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
+ * PipeTaskScheduler. It is a singleton class.
+ */
+public class PipeTaskExecutor {
+
+  private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor =
+      new PipeAssignerSubtaskExecutor();
+  private final PipeProcessorSubtaskExecutor processorSubtaskExecutor =
+      new PipeProcessorSubtaskExecutor();
+  private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor =
+      new PipeConnectorSubtaskExecutor();
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskExecutor() {}
+
+  private static class PipeTaskExecutorHolder {
+    private static PipeTaskExecutor instance = null;
+  }
+
+  public static PipeTaskExecutor setupAndGetInstance() {
+    if (PipeTaskExecutorHolder.instance == null) {
+      PipeTaskExecutorHolder.instance = new PipeTaskExecutor();
+    }
+    return PipeTaskExecutorHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
index 5034fb50e7..2cab31d737 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
index 5034fb50e7..c53c6b040d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
index 5034fb50e7..9f5df481b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
index 5034fb50e7..c87f949103 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -17,22 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public interface PipeSubtaskScheduler {
 
-  private PipeTaskAgent() {}
+  void createSubtask(String subtaskId, PipeSubtask subtask);
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  void dropSubtask(String subtaskId);
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  void startSubtask(String subtaskId);
+
+  void stopSubtask(String subtaskId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
new file mode 100644
index 0000000000..cda2cc9466
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.PipeTask;
+
+/**
+ * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and
+ * executes them in the PipeTaskExecutor. It is a singleton class.
+ */
+public class PipeTaskScheduler {
+
+  private final PipeSubtaskScheduler assignerSubtaskScheduler;
+  private final PipeSubtaskScheduler processorSubtaskScheduler;
+  private final PipeSubtaskScheduler connectorSubtaskScheduler;
+
+  public void createPipeTask(PipeTask pipeTask) {}
+
+  public void dropPipeTask(String pipeName) {}
+
+  public void startPipeTask(String pipeName) {}
+
+  public void stopPipeTask(String pipeName) {}
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskScheduler() {
+    assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler();
+    processorSubtaskScheduler = new PipeProcessorSubtaskScheduler();
+    connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler();
+  }
+
+  private static class PipeTaskSchedulerHolder {
+    private static PipeTaskScheduler instance = null;
+  }
+
+  public static PipeTaskScheduler setupAndGetInstance() {
+    if (PipeTaskSchedulerHolder.instance == null) {
+      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
+    }
+    return PipeTaskSchedulerHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
index 5034fb50e7..25ce3d1142 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
index 5034fb50e7..984c3fbaf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
index 5034fb50e7..04fe515e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTsFileHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
index 5034fb50e7..78e0a7c612 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index e42b1f66f3..e3a2819deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -17,22 +17,32 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTask {
 
-  private PipeRuntimeAgent() {}
+  private final String pipeName;
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  private final PipeTaskStage collectorStage;
+  private final PipeTaskStage processorStage;
+  private final PipeTaskStage connectorStage;
+
+  private final PipeTaskRuntimeRecorder runtimeRecorder;
+
+  public PipeTask(
+      String pipeName,
+      PipeTaskStage collectorStage,
+      PipeTaskStage processorStage,
+      PipeTaskStage connectorStage) {
+    this.pipeName = pipeName;
+
+    this.collectorStage = collectorStage;
+    this.processorStage = processorStage;
+    this.connectorStage = connectorStage;
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
+    runtimeRecorder = new PipeTaskRuntimeRecorder();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 5034fb50e7..19e5f460ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -17,22 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+/** PipeTaskBuilder is used to build a PipeTask. */
+public class PipeTaskBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
index 5034fb50e7..6f09614958 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.metrics;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskRuntimeRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
index 5034fb50e7..5890801ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeAssignerSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeAssignerSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
index 5034fb50e7..b199607241 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeConnectorSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeConnectorSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
index 5034fb50e7..cfdf0123e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeProcessorSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeProcessorSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
index 5034fb50e7..daebd15e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public abstract class PipeSubtask extends WrappedRunnable {
 
-  private PipeTaskAgent() {}
+  private final String taskID;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeSubtask(String taskID) {
+    super();
+    this.taskID = taskID;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public String getTaskID() {
+    return taskID;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 5034fb50e7..930a06cc94 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskCollectorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5034fb50e7..fddb99fb5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskConnectorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
similarity index 62%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 5034fb50e7..5a22721a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskProcessorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
similarity index 52%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index e42b1f66f3..09ae67ef76 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -17,22 +17,37 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public interface PipeTaskStage {
 
-  private PipeRuntimeAgent() {}
+  /**
+   * Create a pipe task stage.
+   *
+   * @throws PipeException if failed to create a pipe task stage.
+   */
+  void create() throws PipeException;
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  /**
+   * Start a pipe task stage.
+   *
+   * @throws PipeException if failed to start a pipe task stage.
+   */
+  void start() throws PipeException;
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
-  }
+  /**
+   * Stop a pipe task stage.
+   *
+   * @throws PipeException if failed to stop a pipe task stage.
+   */
+  void stop() throws PipeException;
+
+  /**
+   * Drop a pipe task stage.
+   *
+   * @throws PipeException if failed to drop a pipe task stage.
+   */
+  void drop() throws PipeException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 308afe736d..39b4e59b9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.service.thrift.impl;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.common.rpc.thrift.*;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
@@ -113,6 +112,8 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1068,7 +1069,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     if (configNodeLocations != null) {
       ConfigNodeInfo.getInstance()
           .updateConfigNodeList(
-              configNodeLocations.parallelStream()
+              configNodeLocations
+                  .parallelStream()
                   .map(TConfigNodeLocation::getInternalEndPoint)
                   .collect(Collectors.toList()));
     }