You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 21:16:54 UTC

[iotdb] branch IOTDB-5692 created (now 0c0b21bd79)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 0c0b21bd79 Pipe: Skeleton Code Framework

This branch includes the following new commits:

     new a5139bd442 pipe agent skeleton
     new a6c8bdba4c refactor pipe agent skeleton
     new 0c0b21bd79 Pipe: Skeleton Code Framework

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/03: Pipe: Skeleton Code Framework

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c0b21bd79e098c88300c98887bf4dabbc44e66d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Mar 19 05:16:29 2023 +0800

    Pipe: Skeleton Code Framework
---
 .../pipe/task/meta/PipeTaskMetaAccessor.java       |  20 +---
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |   3 +
 .../pipe/agent/{ => plugin}/PipePluginAgent.java   |   5 +-
 .../HeartbeatScheduler.java}                       |  21 +----
 .../MetaSyncScheduler.java}                        |  20 +---
 .../pipe/agent/{ => runtime}/PipeRuntimeAgent.java |   4 +-
 .../db/pipe/agent/{ => task}/PipeTaskAgent.java    |   4 +-
 .../PipeTaskRegionAgent.java}                      |  20 +---
 .../collector/PipeCollectorEventPendingQueue.java} |  20 +---
 .../collector/PipeCollectorEventSelector.java}     |  20 +---
 .../historical/PipeHistoricalCollector.java}       |  20 +---
 .../collector/realtime/PipeRealtimeCollector.java} |  20 +---
 .../realtime/cache/PipeRealtimeEventCache.java}    |  20 +---
 .../realtime/listener/IoTLogListerner.java}        |  20 +---
 .../realtime/listener/RatisLogListener.java}       |  20 +---
 .../realtime/listener/SimpleLogListener.java}      |  20 +---
 .../listener/TsFileGenerationListener.java}        |  20 +---
 .../collector/realtime/matcher/Rule.java}          |  20 +---
 .../realtime/matcher/RulePrefixMatchTree.java}     |  20 +---
 .../collector/realtime/recorder/TsFileEpoch.java}  |  20 +---
 .../realtime/recorder/TsFileEpochRecorder.java}    |  20 +---
 .../connector/PipeConnectorContainer.java}         |  20 +---
 .../connector/PipeConnectorManager.java}           |  20 +---
 .../PipeConnectorPluginRuntimeWrapper.java}        |  19 ++--
 .../event/PipeTabletInsertionEvent.java}           |  30 +++---
 .../event/PipeTsFileInsertionEvent.java}           |  22 ++---
 .../iotdb/db/pipe/core/event/access/PipeRow.java   | 102 +++++++++++++++++++++
 .../db/pipe/core/event/access/PipeRowIterator.java |  60 ++++++++++++
 .../event/collector/PipeEventCollector.java}       |  27 +++---
 .../event/collector/PipeRowCollector.java}         |  21 ++---
 .../event/indexer/PipeEventIndexer.java}           |  20 +---
 .../event/indexer/PipeIoTEventIndexer.java}        |  20 +---
 .../event/indexer/PipeRatisEventIndexer.java}      |  20 +---
 .../event/indexer/PipeSimpleEventIndexer.java}     |  20 +---
 .../event/indexer/PipeTsFileEventIndexer.java}     |  20 +---
 .../PipeProcessorPluginRuntimeWrapper.java}        |  19 ++--
 .../executor/PipeAssignerSubtaskExecutor.java}     |  20 +---
 .../executor/PipeConnectorSubtaskExecutor.java}    |  20 +---
 .../executor/PipeProcessorSubtaskExecutor.java}    |  20 +---
 .../executor/PipeSubtaskExecutor.java}             |  20 +---
 .../pipe/execution/executor/PipeTaskExecutor.java  |  49 ++++++++++
 .../scheduler/PipeAssignerSubtaskScheduler.java}   |  24 +++--
 .../scheduler/PipeConnectorSubtaskScheduler.java}  |  24 +++--
 .../scheduler/PipeProcessorSubtaskScheduler.java}  |  24 +++--
 .../scheduler/PipeSubtaskScheduler.java}           |  21 ++---
 .../execution/scheduler/PipeTaskScheduler.java     |  60 ++++++++++++
 .../PipeFileManager.java}                          |  20 +---
 .../PipeRaftlogHolder.java}                        |  20 +---
 .../PipeTsFileHolder.java}                         |  20 +---
 .../PipeWALHolder.java}                            |  20 +---
 .../PipeRuntimeAgent.java => task/PipeTask.java}   |  34 ++++---
 .../PipeTaskBuilder.java}                          |  21 +----
 .../metrics/PipeTaskRuntimeRecorder.java}          |  20 +---
 .../runnable/PipeAssignerSubtask.java}             |  20 ++--
 .../runnable/PipeConnectorSubtask.java}            |  20 ++--
 .../runnable/PipeProcessorSubtask.java}            |  20 ++--
 .../runnable/PipeSubtask.java}                     |  20 ++--
 .../stage/PipeTaskCollectorStage.java}             |  25 +++--
 .../stage/PipeTaskConnectorStage.java}             |  25 +++--
 .../stage/PipeTaskProcessorStage.java}             |  25 +++--
 .../stage/PipeTaskStage.java}                      |  41 ++++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +-
 62 files changed, 571 insertions(+), 865 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
index 5034fb50e7..04653122c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.commons.pipe.task.meta;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskMetaAccessor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 7e3b3a1e87..1c50fe5b40 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.agent;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
 
 /** PipeAgent is the entry point of the pipe module in DatNode. */
 public class PipeAgent {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 57cb0fb1b3..6831ac6a34 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
@@ -188,7 +188,8 @@ public class PipePluginAgent {
     private static PipePluginAgent instance = null;
   }
 
-  static PipePluginAgent setupAndGetInstance(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+  public static PipePluginAgent setupAndGetInstance(
+      DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
     if (PipePluginAgentServiceHolder.instance == null) {
       PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
index 5034fb50e7..de3f30f388 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
@@ -17,22 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
+public class HeartbeatScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
index 5034fb50e7..366176cd1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class MetaSyncScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index e42b1f66f3..cbfe53be8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
 
 public class PipeRuntimeAgent {
 
@@ -29,7 +29,7 @@ public class PipeRuntimeAgent {
     private static PipeRuntimeAgent INSTANCE = null;
   }
 
-  static PipeRuntimeAgent setupAndGetInstance() {
+  public static PipeRuntimeAgent setupAndGetInstance() {
     if (PipeRuntimeAgentHolder.INSTANCE == null) {
       PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 5034fb50e7..dd8a6984f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
 
 public class PipeTaskAgent {
 
@@ -29,7 +29,7 @@ public class PipeTaskAgent {
     private static PipeTaskAgent instance = null;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
+  public static PipeTaskAgent setupAndGetInstance() {
     if (PipeTaskAgentHolder.instance == null) {
       PipeTaskAgentHolder.instance = new PipeTaskAgent();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
index 5034fb50e7..7be285ef66 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskRegionAgent {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
index 5034fb50e7..c3bfb68015 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeCollectorEventPendingQueue {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
index 5034fb50e7..e1c50a9601 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeCollectorEventSelector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
index 5034fb50e7..6e680de847 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.historical;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeHistoricalCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
index 5034fb50e7..b5fad778b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRealtimeCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
index 5034fb50e7..7525e06b54 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRealtimeEventCache {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
index 5034fb50e7..b91334430d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class IoTLogListerner {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
index 5034fb50e7..3aa42354f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class RatisLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
index 5034fb50e7..b5e1eaf93b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class SimpleLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
index 5034fb50e7..4c28795219 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileGenerationListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
index 5034fb50e7..948fab2acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class Rule {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
index 5034fb50e7..a543d7f0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class RulePrefixMatchTree {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
index 5034fb50e7..a85d3a0b70 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileEpoch {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
index 5034fb50e7..6948cef200 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class TsFileEpochRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
index 5034fb50e7..8651ed3df1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorContainer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
index 5034fb50e7..881edd67f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
index 5034fb50e7..c1c5be1166 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeConnector;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeConnectorPluginRuntimeWrapper {
 
-  private PipeTaskAgent() {}
+  private final PipeConnector pipeConnector;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) {
+    this.pipeConnector = pipeConnector;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
index e42b1f66f3..a58e8db497 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
@@ -17,22 +17,30 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.util.Iterator;
+import java.util.function.BiConsumer;
 
-  private PipeRuntimeAgent() {}
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
+  @Override
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
+    return null;
   }
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
+  @Override
+  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) {
+    return null;
+  }
+
+  @Override
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
index 5034fb50e7..d11b4f780d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
 
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  @Override
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    return null;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  @Override
+  public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
new file mode 100644
index 0000000000..43b438445f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRow implements Row {
+
+  @Override
+  public long getTime() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws IOException {
+    return false;
+  }
+
+  @Override
+  public Binary getBinary(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public Type getDataType(int columnIndex) {
+    return null;
+  }
+
+  @Override
+  public boolean isNull(int columnIndex) {
+    return false;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
new file mode 100644
index 0000000000..960214bea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.access.RowIterator;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRowIterator implements RowIterator {
+
+  @Override
+  public boolean hasNextRow() {
+    return false;
+  }
+
+  @Override
+  public Row next() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void reset() {}
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
index e42b1f66f3..2c2bbfd38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
@@ -17,22 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.io.IOException;
 
-  private PipeRuntimeAgent() {}
+public class PipeEventCollector implements EventCollector {
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  @Override
+  public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
-  }
+  @Override
+  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+
+  @Override
+  public void collectDeletionEvent(DeletionEvent event) throws IOException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
index 5034fb50e7..525e79c137 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+import java.io.IOException;
 
-  private PipeTaskAgent() {}
+public class PipeRowCollector implements RowCollector {
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void collectRow(Row row) throws IOException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
index 5034fb50e7..22cbce9e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public interface PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
index 5034fb50e7..63aff8a519 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeIoTEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
index 5034fb50e7..4520855085 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRatisEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
index 5034fb50e7..d5add45547 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeSimpleEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
index 5034fb50e7..be53e89751 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
index 5034fb50e7..3153d2ae0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.processor;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeProcessor;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeProcessorPluginRuntimeWrapper {
 
-  private PipeTaskAgent() {}
+  private final PipeProcessor pipeProcessor;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) {
+    this.pipeProcessor = pipeProcessor;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
index 5034fb50e7..cc3dd43987 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
index 5034fb50e7..98eaf31d1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
index 5034fb50e7..c61871fafe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 5034fb50e7..7d97605dff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public interface PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
new file mode 100644
index 0000000000..4437fb119d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+/**
+ * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
+ * PipeTaskScheduler. It is a singleton class.
+ */
+public class PipeTaskExecutor {
+
+  private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor =
+      new PipeAssignerSubtaskExecutor();
+  private final PipeProcessorSubtaskExecutor processorSubtaskExecutor =
+      new PipeProcessorSubtaskExecutor();
+  private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor =
+      new PipeConnectorSubtaskExecutor();
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskExecutor() {}
+
+  private static class PipeTaskExecutorHolder {
+    private static PipeTaskExecutor instance = null;
+  }
+
+  public static PipeTaskExecutor setupAndGetInstance() {
+    if (PipeTaskExecutorHolder.instance == null) {
+      PipeTaskExecutorHolder.instance = new PipeTaskExecutor();
+    }
+    return PipeTaskExecutorHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
index 5034fb50e7..2cab31d737 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
index 5034fb50e7..c53c6b040d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
index 5034fb50e7..9f5df481b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
 
-  private PipeTaskAgent() {}
+  @Override
+  public void dropSubtask(String subtaskId) {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void startSubtask(String subtaskId) {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stopSubtask(String subtaskId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
index 5034fb50e7..c87f949103 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -17,22 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public interface PipeSubtaskScheduler {
 
-  private PipeTaskAgent() {}
+  void createSubtask(String subtaskId, PipeSubtask subtask);
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  void dropSubtask(String subtaskId);
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  void startSubtask(String subtaskId);
+
+  void stopSubtask(String subtaskId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
new file mode 100644
index 0000000000..cda2cc9466
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.PipeTask;
+
+/**
+ * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and
+ * executes them in the PipeTaskExecutor. It is a singleton class.
+ */
+public class PipeTaskScheduler {
+
+  private final PipeSubtaskScheduler assignerSubtaskScheduler;
+  private final PipeSubtaskScheduler processorSubtaskScheduler;
+  private final PipeSubtaskScheduler connectorSubtaskScheduler;
+
+  public void createPipeTask(PipeTask pipeTask) {}
+
+  public void dropPipeTask(String pipeName) {}
+
+  public void startPipeTask(String pipeName) {}
+
+  public void stopPipeTask(String pipeName) {}
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskScheduler() {
+    assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler();
+    processorSubtaskScheduler = new PipeProcessorSubtaskScheduler();
+    connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler();
+  }
+
+  private static class PipeTaskSchedulerHolder {
+    private static PipeTaskScheduler instance = null;
+  }
+
+  public static PipeTaskScheduler setupAndGetInstance() {
+    if (PipeTaskSchedulerHolder.instance == null) {
+      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
+    }
+    return PipeTaskSchedulerHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
index 5034fb50e7..25ce3d1142 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
index 5034fb50e7..984c3fbaf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
index 5034fb50e7..04fe515e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTsFileHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
index 5034fb50e7..78e0a7c612 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index e42b1f66f3..e3a2819deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -17,22 +17,32 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTask {
 
-  private PipeRuntimeAgent() {}
+  private final String pipeName;
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  private final PipeTaskStage collectorStage;
+  private final PipeTaskStage processorStage;
+  private final PipeTaskStage connectorStage;
+
+  private final PipeTaskRuntimeRecorder runtimeRecorder;
+
+  public PipeTask(
+      String pipeName,
+      PipeTaskStage collectorStage,
+      PipeTaskStage processorStage,
+      PipeTaskStage connectorStage) {
+    this.pipeName = pipeName;
+
+    this.collectorStage = collectorStage;
+    this.processorStage = processorStage;
+    this.connectorStage = connectorStage;
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
+    runtimeRecorder = new PipeTaskRuntimeRecorder();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 5034fb50e7..19e5f460ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -17,22 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+/** PipeTaskBuilder is used to build a PipeTask. */
+public class PipeTaskBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
index 5034fb50e7..6f09614958 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
@@ -17,22 +17,6 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.metrics;
 
-public class PipeTaskAgent {
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
-
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
-}
+public class PipeTaskRuntimeRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
index 5034fb50e7..5890801ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeAssignerSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeAssignerSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
index 5034fb50e7..b199607241 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeConnectorSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeConnectorSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
index 5034fb50e7..cfdf0123e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
@@ -17,22 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+public class PipeProcessorSubtask extends PipeSubtask {
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskAgent() {}
-
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeProcessorSubtask(String taskID) {
+    super(taskID);
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void runMayThrow() throws Throwable {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
index 5034fb50e7..daebd15e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
@@ -17,22 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public abstract class PipeSubtask extends WrappedRunnable {
 
-  private PipeTaskAgent() {}
+  private final String taskID;
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
+  public PipeSubtask(String taskID) {
+    super();
+    this.taskID = taskID;
   }
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
+  public String getTaskID() {
+    return taskID;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 5034fb50e7..930a06cc94 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskCollectorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5034fb50e7..fddb99fb5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskConnectorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
similarity index 62%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 5034fb50e7..5a22721a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -17,22 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public class PipeTaskProcessorStage implements PipeTaskStage {
 
-  private PipeTaskAgent() {}
+  @Override
+  public void create() throws PipeException {}
 
-  private static class PipeTaskAgentHolder {
-    private static PipeTaskAgent instance = null;
-  }
+  @Override
+  public void start() throws PipeException {}
 
-  static PipeTaskAgent setupAndGetInstance() {
-    if (PipeTaskAgentHolder.instance == null) {
-      PipeTaskAgentHolder.instance = new PipeTaskAgent();
-    }
-    return PipeTaskAgentHolder.instance;
-  }
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
similarity index 52%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index e42b1f66f3..09ae67ef76 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -17,22 +17,37 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
 
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
+public interface PipeTaskStage {
 
-  private PipeRuntimeAgent() {}
+  /**
+   * Create a pipe task stage.
+   *
+   * @throws PipeException if failed to create a pipe task stage.
+   */
+  void create() throws PipeException;
 
-  private static class PipeRuntimeAgentHolder {
-    private static PipeRuntimeAgent INSTANCE = null;
-  }
+  /**
+   * Start a pipe task stage.
+   *
+   * @throws PipeException if failed to start a pipe task stage.
+   */
+  void start() throws PipeException;
 
-  static PipeRuntimeAgent setupAndGetInstance() {
-    if (PipeRuntimeAgentHolder.INSTANCE == null) {
-      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
-    }
-    return PipeRuntimeAgentHolder.INSTANCE;
-  }
+  /**
+   * Stop a pipe task stage.
+   *
+   * @throws PipeException if failed to stop a pipe task stage.
+   */
+  void stop() throws PipeException;
+
+  /**
+   * Drop a pipe task stage.
+   *
+   * @throws PipeException if failed to drop a pipe task stage.
+   */
+  void drop() throws PipeException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 308afe736d..39b4e59b9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.service.thrift.impl;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.common.rpc.thrift.*;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
@@ -113,6 +112,8 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1068,7 +1069,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     if (configNodeLocations != null) {
       ConfigNodeInfo.getInstance()
           .updateConfigNodeList(
-              configNodeLocations.parallelStream()
+              configNodeLocations
+                  .parallelStream()
                   .map(TConfigNodeLocation::getInternalEndPoint)
                   .collect(Collectors.toList()));
     }


[iotdb] 01/03: pipe agent skeleton

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a5139bd4420dba28325eecb91637943f7cfadeb9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Mar 17 12:24:28 2023 +0800

    pipe agent skeleton
---
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  | 53 ++++++++++++
 .../iotdb/db/pipe/agent/PipePluginAgent.java       |  4 +-
 .../iotdb/db/pipe/agent/PipeRuntimeAgent.java      | 35 ++++++++
 .../apache/iotdb/db/pipe/agent/PipeTaskAgent.java  | 35 ++++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       | 98 ++--------------------
 5 files changed, 135 insertions(+), 90 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
new file mode 100644
index 0000000000..e94807e8e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeAgent {
+
+  /** Private constructor to prevent users from creating a new instance. */
+  private PipeAgent() {}
+
+  /**
+   * Get the singleton instance of PipeTaskAgent.
+   *
+   * @return the singleton instance of PipeTaskAgent
+   */
+  public static PipeTaskAgent task() {
+    return PipeTaskAgent.getInstance();
+  }
+
+  /**
+   * Get the singleton instance of PipePluginAgent.
+   *
+   * @return the singleton instance of PipePluginAgent
+   */
+  public static PipePluginAgent plugin() {
+    return PipePluginAgent.getInstance();
+  }
+
+  /**
+   * Get the singleton instance of PipeRuntimeAgent.
+   *
+   * @return the singleton instance of PipeRuntimeAgent
+   */
+  public static PipeRuntimeAgent runtime() {
+    return PipeRuntimeAgent.getInstance();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
index d820441dac..a0a62ca118 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
@@ -181,11 +181,13 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
+  private PipePluginAgent() {}
+
   private static class PipePluginAgentServiceHolder {
     private static final PipePluginAgent INSTANCE = new PipePluginAgent();
   }
 
-  public static PipePluginAgent getInstance() {
+  static PipePluginAgent getInstance() {
     return PipePluginAgentServiceHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
new file mode 100644
index 0000000000..fe9ab3ee8a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeRuntimeAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeRuntimeAgent() {}
+
+  private static class PipeRuntimeAgentHolder {
+    private static final PipeRuntimeAgent INSTANCE = new PipeRuntimeAgent();
+  }
+
+  static PipeRuntimeAgent getInstance() {
+    return PipeRuntimeAgent.PipeRuntimeAgentHolder.INSTANCE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
new file mode 100644
index 0000000000..75c8e14636
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+public class PipeTaskAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskAgent() {}
+
+  private static class PipeTaskAgentHolder {
+    private static final PipeTaskAgent INSTANCE = new PipeTaskAgent();
+  }
+
+  static PipeTaskAgent getInstance() {
+    return PipeTaskAgent.PipeTaskAgentHolder.INSTANCE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 60edda2b56..308afe736d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,14 +19,8 @@
 
 package org.apache.iotdb.db.service.thrift.impl;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.common.rpc.thrift.TSettleReq;
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.common.rpc.thrift.*;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -92,17 +86,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.*;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
@@ -115,68 +104,7 @@ import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.type.AutoGauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteModelMetricsReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFetchWindowBatchResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
-import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
-import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
-import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRecordModelMetricsReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
-import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
-import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
+import org.apache.iotdb.mpp.rpc.thrift.*;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.trigger.api.enums.FailureStrategy;
@@ -185,8 +113,6 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import com.google.common.collect.ImmutableList;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,12 +121,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1147,8 +1068,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     if (configNodeLocations != null) {
       ConfigNodeInfo.getInstance()
           .updateConfigNodeList(
-              configNodeLocations
-                  .parallelStream()
+              configNodeLocations.parallelStream()
                   .map(TConfigNodeLocation::getInternalEndPoint)
                   .collect(Collectors.toList()));
     }
@@ -1482,7 +1402,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) {
     try {
       PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta);
-      PipePluginAgent.getInstance().register(pipePluginMeta, req.jarFile);
+      PipeAgent.plugin().register(pipePluginMeta, req.jarFile);
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
@@ -1493,7 +1413,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) {
     try {
-      PipePluginAgent.getInstance().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
+      PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())


[iotdb] 02/03: refactor pipe agent skeleton

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6c8bdba4ca6efe19b19e0b0705c4439f9e73f01
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Mar 17 12:47:49 2023 +0800

    refactor pipe agent skeleton
---
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  | 26 ++++++++++++++++++----
 .../iotdb/db/pipe/agent/PipePluginAgent.java       | 16 ++++++++-----
 .../iotdb/db/pipe/agent/PipeRuntimeAgent.java      |  9 +++++---
 .../apache/iotdb/db/pipe/agent/PipeTaskAgent.java  |  9 +++++---
 4 files changed, 44 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index e94807e8e1..7e3b3a1e87 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -19,10 +19,28 @@
 
 package org.apache.iotdb.db.pipe.agent;
 
+import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+
+/** PipeAgent is the entry point of the pipe module in DatNode. */
 public class PipeAgent {
 
+  private final PipePluginAgent pipePluginAgent;
+  private final PipeTaskAgent pipeTaskAgent;
+  private final PipeRuntimeAgent pipeRuntimeAgent;
+
   /** Private constructor to prevent users from creating a new instance. */
-  private PipeAgent() {}
+  private PipeAgent() {
+    final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
+
+    pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+    pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
+    pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
+  }
+
+  /** The singleton holder of PipeAgent. */
+  private static class PipeAgentHolder {
+    private static final PipeAgent HANDLE = new PipeAgent();
+  }
 
   /**
    * Get the singleton instance of PipeTaskAgent.
@@ -30,7 +48,7 @@ public class PipeAgent {
    * @return the singleton instance of PipeTaskAgent
    */
   public static PipeTaskAgent task() {
-    return PipeTaskAgent.getInstance();
+    return PipeAgentHolder.HANDLE.pipeTaskAgent;
   }
 
   /**
@@ -39,7 +57,7 @@ public class PipeAgent {
    * @return the singleton instance of PipePluginAgent
    */
   public static PipePluginAgent plugin() {
-    return PipePluginAgent.getInstance();
+    return PipeAgentHolder.HANDLE.pipePluginAgent;
   }
 
   /**
@@ -48,6 +66,6 @@ public class PipeAgent {
    * @return the singleton instance of PipeRuntimeAgent
    */
   public static PipeRuntimeAgent runtime() {
-    return PipeRuntimeAgent.getInstance();
+    return PipeAgentHolder.HANDLE.pipeRuntimeAgent;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
index a0a62ca118..57cb0fb1b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
@@ -41,8 +41,7 @@ public class PipePluginAgent {
 
   private final ReentrantLock lock = new ReentrantLock();
 
-  private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper =
-      new DataNodePipePluginMetaKeeper();
+  private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
 
   /////////////////////////////// Lock ///////////////////////////////
 
@@ -181,13 +180,18 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
-  private PipePluginAgent() {}
+  private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+    this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+  }
 
   private static class PipePluginAgentServiceHolder {
-    private static final PipePluginAgent INSTANCE = new PipePluginAgent();
+    private static PipePluginAgent instance = null;
   }
 
-  static PipePluginAgent getInstance() {
-    return PipePluginAgentServiceHolder.INSTANCE;
+  static PipePluginAgent setupAndGetInstance(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+    if (PipePluginAgentServiceHolder.instance == null) {
+      PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+    }
+    return PipePluginAgentServiceHolder.instance;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
index fe9ab3ee8a..e42b1f66f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
@@ -26,10 +26,13 @@ public class PipeRuntimeAgent {
   private PipeRuntimeAgent() {}
 
   private static class PipeRuntimeAgentHolder {
-    private static final PipeRuntimeAgent INSTANCE = new PipeRuntimeAgent();
+    private static PipeRuntimeAgent INSTANCE = null;
   }
 
-  static PipeRuntimeAgent getInstance() {
-    return PipeRuntimeAgent.PipeRuntimeAgentHolder.INSTANCE;
+  static PipeRuntimeAgent setupAndGetInstance() {
+    if (PipeRuntimeAgentHolder.INSTANCE == null) {
+      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
+    }
+    return PipeRuntimeAgentHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
index 75c8e14636..5034fb50e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
@@ -26,10 +26,13 @@ public class PipeTaskAgent {
   private PipeTaskAgent() {}
 
   private static class PipeTaskAgentHolder {
-    private static final PipeTaskAgent INSTANCE = new PipeTaskAgent();
+    private static PipeTaskAgent instance = null;
   }
 
-  static PipeTaskAgent getInstance() {
-    return PipeTaskAgent.PipeTaskAgentHolder.INSTANCE;
+  static PipeTaskAgent setupAndGetInstance() {
+    if (PipeTaskAgentHolder.instance == null) {
+      PipeTaskAgentHolder.instance = new PipeTaskAgent();
+    }
+    return PipeTaskAgentHolder.instance;
   }
 }