You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/19 10:39:38 UTC

[iotdb] branch master updated: [IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ad8940cbef [IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)
ad8940cbef is described below

commit ad8940cbefc3935902c491e5157682f4235ad599
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Mar 19 18:39:31 2023 +0800

    [IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)
---
 .../pipe/task/meta/PipeTaskMetaAccessor.java       |  22 +++++
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |  74 +++++++++++++++
 .../pipe/agent/{ => plugin}/PipePluginAgent.java   |  19 ++--
 .../db/pipe/agent/runtime/HeartbeatScheduler.java  |  23 +++++
 .../db/pipe/agent/runtime/MetaSyncScheduler.java   |  22 +++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  38 ++++++++
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  38 ++++++++
 .../db/pipe/agent/task/PipeTaskRegionAgent.java    |  22 +++++
 .../collector/PipeCollectorEventPendingQueue.java  |  22 +++++
 .../core/collector/PipeCollectorEventSelector.java |  22 +++++
 .../historical/PipeHistoricalCollector.java        |  22 +++++
 .../collector/realtime/PipeRealtimeCollector.java  |  22 +++++
 .../realtime/cache/PipeRealtimeEventCache.java     |  22 +++++
 .../realtime/listener/IoTLogListerner.java         |  22 +++++
 .../realtime/listener/RatisLogListener.java        |  22 +++++
 .../realtime/listener/SimpleLogListener.java       |  22 +++++
 .../listener/TsFileGenerationListener.java         |  22 +++++
 .../pipe/core/collector/realtime/matcher/Rule.java |  22 +++++
 .../realtime/matcher/RulePrefixMatchTree.java      |  22 +++++
 .../collector/realtime/recorder/TsFileEpoch.java   |  22 +++++
 .../realtime/recorder/TsFileEpochRecorder.java     |  22 +++++
 .../core/connector/PipeConnectorContainer.java     |  22 +++++
 .../pipe/core/connector/PipeConnectorManager.java  |  22 +++++
 .../PipeConnectorPluginRuntimeWrapper.java         |  31 +++++++
 .../pipe/core/event/PipeTabletInsertionEvent.java  |  46 ++++++++++
 .../pipe/core/event/PipeTsFileInsertionEvent.java  |  36 ++++++++
 .../iotdb/db/pipe/core/event/access/PipeRow.java   | 102 +++++++++++++++++++++
 .../db/pipe/core/event/access/PipeRowIterator.java |  60 ++++++++++++
 .../core/event/collector/PipeEventCollector.java   |  39 ++++++++
 .../core/event/collector/PipeRowCollector.java     |  31 +++++++
 .../pipe/core/event/indexer/PipeEventIndexer.java  |  22 +++++
 .../core/event/indexer/PipeIoTEventIndexer.java    |  22 +++++
 .../core/event/indexer/PipeRatisEventIndexer.java  |  22 +++++
 .../core/event/indexer/PipeSimpleEventIndexer.java |  22 +++++
 .../core/event/indexer/PipeTsFileEventIndexer.java |  22 +++++
 .../PipeProcessorPluginRuntimeWrapper.java         |  31 +++++++
 .../executor/PipeAssignerSubtaskExecutor.java      |  22 +++++
 .../executor/PipeConnectorSubtaskExecutor.java     |  22 +++++
 .../executor/PipeProcessorSubtaskExecutor.java     |  22 +++++
 .../execution/executor/PipeSubtaskExecutor.java    |  22 +++++
 .../pipe/execution/executor/PipeTaskExecutor.java  |  49 ++++++++++
 .../scheduler/PipeAssignerSubtaskScheduler.java    |  36 ++++++++
 .../scheduler/PipeConnectorSubtaskScheduler.java   |  36 ++++++++
 .../scheduler/PipeProcessorSubtaskScheduler.java   |  36 ++++++++
 .../execution/scheduler/PipeSubtaskScheduler.java  |  33 +++++++
 .../execution/scheduler/PipeTaskScheduler.java     |  60 ++++++++++++
 .../iotdb/db/pipe/resource/PipeFileManager.java    |  22 +++++
 .../iotdb/db/pipe/resource/PipeRaftlogHolder.java  |  22 +++++
 .../iotdb/db/pipe/resource/PipeTsFileHolder.java   |  22 +++++
 .../iotdb/db/pipe/resource/PipeWALHolder.java      |  22 +++++
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |  48 ++++++++++
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  23 +++++
 .../pipe/task/metrics/PipeTaskRuntimeRecorder.java |  22 +++++
 .../db/pipe/task/runnable/PipeAssignerSubtask.java |  30 ++++++
 .../pipe/task/runnable/PipeConnectorSubtask.java   |  30 ++++++
 .../pipe/task/runnable/PipeProcessorSubtask.java   |  30 ++++++
 .../iotdb/db/pipe/task/runnable/PipeSubtask.java   |  36 ++++++++
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  37 ++++++++
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  37 ++++++++
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  37 ++++++++
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  53 +++++++++++
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +-
 63 files changed, 1882 insertions(+), 11 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
new file mode 100644
index 0000000000..04653122c2
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+public class PipeTaskMetaAccessor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
new file mode 100644
index 0000000000..1c50fe5b40
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
+
+/** PipeAgent is the entry point of the pipe module in DatNode. */
+public class PipeAgent {
+
+  private final PipePluginAgent pipePluginAgent;
+  private final PipeTaskAgent pipeTaskAgent;
+  private final PipeRuntimeAgent pipeRuntimeAgent;
+
+  /** Private constructor to prevent users from creating a new instance. */
+  private PipeAgent() {
+    final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
+
+    pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+    pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
+    pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
+  }
+
+  /** The singleton holder of PipeAgent. */
+  private static class PipeAgentHolder {
+    private static final PipeAgent HANDLE = new PipeAgent();
+  }
+
+  /**
+   * Get the singleton instance of PipeTaskAgent.
+   *
+   * @return the singleton instance of PipeTaskAgent
+   */
+  public static PipeTaskAgent task() {
+    return PipeAgentHolder.HANDLE.pipeTaskAgent;
+  }
+
+  /**
+   * Get the singleton instance of PipePluginAgent.
+   *
+   * @return the singleton instance of PipePluginAgent
+   */
+  public static PipePluginAgent plugin() {
+    return PipeAgentHolder.HANDLE.pipePluginAgent;
+  }
+
+  /**
+   * Get the singleton instance of PipeRuntimeAgent.
+   *
+   * @return the singleton instance of PipeRuntimeAgent
+   */
+  public static PipeRuntimeAgent runtime() {
+    return PipeAgentHolder.HANDLE.pipeRuntimeAgent;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index dc0565d4a6..d4c8359bcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.plugin;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
@@ -41,8 +41,7 @@ public class PipePluginAgent {
 
   private final ReentrantLock lock = new ReentrantLock();
 
-  private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper =
-      new DataNodePipePluginMetaKeeper();
+  private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
 
   /////////////////////////////// Lock ///////////////////////////////
 
@@ -189,11 +188,19 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
+  private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+    this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+  }
+
   private static class PipePluginAgentServiceHolder {
-    private static final PipePluginAgent INSTANCE = new PipePluginAgent();
+    private static PipePluginAgent instance = null;
   }
 
-  public static PipePluginAgent getInstance() {
-    return PipePluginAgentServiceHolder.INSTANCE;
+  public static PipePluginAgent setupAndGetInstance(
+      DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+    if (PipePluginAgentServiceHolder.instance == null) {
+      PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+    }
+    return PipePluginAgentServiceHolder.instance;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
new file mode 100644
index 0000000000..de3f30f388
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
+public class HeartbeatScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
new file mode 100644
index 0000000000..366176cd1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+public class MetaSyncScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
new file mode 100644
index 0000000000..cbfe53be8b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+public class PipeRuntimeAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeRuntimeAgent() {}
+
+  private static class PipeRuntimeAgentHolder {
+    private static PipeRuntimeAgent INSTANCE = null;
+  }
+
+  public static PipeRuntimeAgent setupAndGetInstance() {
+    if (PipeRuntimeAgentHolder.INSTANCE == null) {
+      PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
+    }
+    return PipeRuntimeAgentHolder.INSTANCE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
new file mode 100644
index 0000000000..dd8a6984f0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task;
+
+public class PipeTaskAgent {
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskAgent() {}
+
+  private static class PipeTaskAgentHolder {
+    private static PipeTaskAgent instance = null;
+  }
+
+  public static PipeTaskAgent setupAndGetInstance() {
+    if (PipeTaskAgentHolder.instance == null) {
+      PipeTaskAgentHolder.instance = new PipeTaskAgent();
+    }
+    return PipeTaskAgentHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
new file mode 100644
index 0000000000..7be285ef66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task;
+
+public class PipeTaskRegionAgent {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
new file mode 100644
index 0000000000..c3bfb68015
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+public class PipeCollectorEventPendingQueue {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
new file mode 100644
index 0000000000..e1c50a9601
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+public class PipeCollectorEventSelector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
new file mode 100644
index 0000000000..6e680de847
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+public class PipeHistoricalCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
new file mode 100644
index 0000000000..b5fad778b0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+public class PipeRealtimeCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
new file mode 100644
index 0000000000..7525e06b54
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
+
+public class PipeRealtimeEventCache {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
new file mode 100644
index 0000000000..b91334430d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class IoTLogListerner {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
new file mode 100644
index 0000000000..3aa42354f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class RatisLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
new file mode 100644
index 0000000000..b5e1eaf93b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class SimpleLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
new file mode 100644
index 0000000000..4c28795219
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class TsFileGenerationListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
new file mode 100644
index 0000000000..948fab2acd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+public class Rule {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
new file mode 100644
index 0000000000..a543d7f0ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+public class RulePrefixMatchTree {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
new file mode 100644
index 0000000000..a85d3a0b70
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
+
+public class TsFileEpoch {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
new file mode 100644
index 0000000000..6948cef200
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
+
+public class TsFileEpochRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
new file mode 100644
index 0000000000..8651ed3df1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+public class PipeConnectorContainer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
new file mode 100644
index 0000000000..881edd67f8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+public class PipeConnectorManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
new file mode 100644
index 0000000000..c1c5be1166
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+
+public class PipeConnectorPluginRuntimeWrapper {
+
+  private final PipeConnector pipeConnector;
+
+  public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) {
+    this.pipeConnector = pipeConnector;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
new file mode 100644
index 0000000000..a58e8db497
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+
+  @Override
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
+    return null;
+  }
+
+  @Override
+  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) {
+    return null;
+  }
+
+  @Override
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
new file mode 100644
index 0000000000..d11b4f780d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
+
+  @Override
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    return null;
+  }
+
+  @Override
+  public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
new file mode 100644
index 0000000000..43b438445f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRow implements Row {
+
+  @Override
+  public long getTime() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public int getInt(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public long getLong(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getFloat(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public double getDouble(int columnIndex) throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean getBoolean(int columnIndex) throws IOException {
+    return false;
+  }
+
+  @Override
+  public Binary getBinary(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws IOException {
+    return null;
+  }
+
+  @Override
+  public Type getDataType(int columnIndex) {
+    return null;
+  }
+
+  @Override
+  public boolean isNull(int columnIndex) {
+    return false;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
new file mode 100644
index 0000000000..960214bea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.access.RowIterator;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRowIterator implements RowIterator {
+
+  @Override
+  public boolean hasNextRow() {
+    return false;
+  }
+
+  @Override
+  public Row next() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void reset() {}
+
+  @Override
+  public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+    return 0;
+  }
+
+  @Override
+  public List<Path> getColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<Type> getColumnTypes() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
new file mode 100644
index 0000000000..2c2bbfd38f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.collector;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class PipeEventCollector implements EventCollector {
+
+  @Override
+  public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
+
+  @Override
+  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+
+  @Override
+  public void collectDeletionEvent(DeletionEvent event) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
new file mode 100644
index 0000000000..525e79c137
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.collector;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+
+import java.io.IOException;
+
+public class PipeRowCollector implements RowCollector {
+
+  @Override
+  public void collectRow(Row row) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
new file mode 100644
index 0000000000..22cbce9e04
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public interface PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
new file mode 100644
index 0000000000..63aff8a519
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeIoTEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
new file mode 100644
index 0000000000..4520855085
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeRatisEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
new file mode 100644
index 0000000000..d5add45547
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeSimpleEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
new file mode 100644
index 0000000000..be53e89751
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
new file mode 100644
index 0000000000..3153d2ae0e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+
+public class PipeProcessorPluginRuntimeWrapper {
+
+  private final PipeProcessor pipeProcessor;
+
+  public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) {
+    this.pipeProcessor = pipeProcessor;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
new file mode 100644
index 0000000000..cc3dd43987
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
new file mode 100644
index 0000000000..98eaf31d1b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
new file mode 100644
index 0000000000..c61871fafe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
new file mode 100644
index 0000000000..7d97605dff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public interface PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
new file mode 100644
index 0000000000..4437fb119d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+/**
+ * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
+ * PipeTaskScheduler. It is a singleton class.
+ */
+public class PipeTaskExecutor {
+
+  private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor =
+      new PipeAssignerSubtaskExecutor();
+  private final PipeProcessorSubtaskExecutor processorSubtaskExecutor =
+      new PipeProcessorSubtaskExecutor();
+  private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor =
+      new PipeConnectorSubtaskExecutor();
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskExecutor() {}
+
+  private static class PipeTaskExecutorHolder {
+    private static PipeTaskExecutor instance = null;
+  }
+
+  public static PipeTaskExecutor setupAndGetInstance() {
+    if (PipeTaskExecutorHolder.instance == null) {
+      PipeTaskExecutorHolder.instance = new PipeTaskExecutor();
+    }
+    return PipeTaskExecutorHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
new file mode 100644
index 0000000000..2cab31d737
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+  @Override
+  public void dropSubtask(String subtaskId) {}
+
+  @Override
+  public void startSubtask(String subtaskId) {}
+
+  @Override
+  public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
new file mode 100644
index 0000000000..c53c6b040d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+  @Override
+  public void dropSubtask(String subtaskId) {}
+
+  @Override
+  public void startSubtask(String subtaskId) {}
+
+  @Override
+  public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
new file mode 100644
index 0000000000..9f5df481b8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler {
+  @Override
+  public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+  @Override
+  public void dropSubtask(String subtaskId) {}
+
+  @Override
+  public void startSubtask(String subtaskId) {}
+
+  @Override
+  public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 0000000000..c87f949103
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public interface PipeSubtaskScheduler {
+
+  void createSubtask(String subtaskId, PipeSubtask subtask);
+
+  void dropSubtask(String subtaskId);
+
+  void startSubtask(String subtaskId);
+
+  void stopSubtask(String subtaskId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
new file mode 100644
index 0000000000..cda2cc9466
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.PipeTask;
+
+/**
+ * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and
+ * executes them in the PipeTaskExecutor. It is a singleton class.
+ */
+public class PipeTaskScheduler {
+
+  private final PipeSubtaskScheduler assignerSubtaskScheduler;
+  private final PipeSubtaskScheduler processorSubtaskScheduler;
+  private final PipeSubtaskScheduler connectorSubtaskScheduler;
+
+  public void createPipeTask(PipeTask pipeTask) {}
+
+  public void dropPipeTask(String pipeName) {}
+
+  public void startPipeTask(String pipeName) {}
+
+  public void stopPipeTask(String pipeName) {}
+
+  /////////////////////////  Singleton Instance Holder  /////////////////////////
+
+  private PipeTaskScheduler() {
+    assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler();
+    processorSubtaskScheduler = new PipeProcessorSubtaskScheduler();
+    connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler();
+  }
+
+  private static class PipeTaskSchedulerHolder {
+    private static PipeTaskScheduler instance = null;
+  }
+
+  public static PipeTaskScheduler setupAndGetInstance() {
+    if (PipeTaskSchedulerHolder.instance == null) {
+      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
+    }
+    return PipeTaskSchedulerHolder.instance;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
new file mode 100644
index 0000000000..25ce3d1142
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
new file mode 100644
index 0000000000..984c3fbaf2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
new file mode 100644
index 0000000000..04fe515e59
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeTsFileHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
new file mode 100644
index 0000000000..78e0a7c612
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
new file mode 100644
index 0000000000..e3a2819deb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
+
+public class PipeTask {
+
+  private final String pipeName;
+
+  private final PipeTaskStage collectorStage;
+  private final PipeTaskStage processorStage;
+  private final PipeTaskStage connectorStage;
+
+  private final PipeTaskRuntimeRecorder runtimeRecorder;
+
+  public PipeTask(
+      String pipeName,
+      PipeTaskStage collectorStage,
+      PipeTaskStage processorStage,
+      PipeTaskStage connectorStage) {
+    this.pipeName = pipeName;
+
+    this.collectorStage = collectorStage;
+    this.processorStage = processorStage;
+    this.connectorStage = connectorStage;
+
+    runtimeRecorder = new PipeTaskRuntimeRecorder();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
new file mode 100644
index 0000000000..19e5f460ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+/** PipeTaskBuilder is used to build a PipeTask. */
+public class PipeTaskBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
new file mode 100644
index 0000000000..6f09614958
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.metrics;
+
+public class PipeTaskRuntimeRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
new file mode 100644
index 0000000000..5890801ea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeAssignerSubtask extends PipeSubtask {
+
+  public PipeAssignerSubtask(String taskID) {
+    super(taskID);
+  }
+
+  @Override
+  public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
new file mode 100644
index 0000000000..b199607241
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeConnectorSubtask extends PipeSubtask {
+
+  public PipeConnectorSubtask(String taskID) {
+    super(taskID);
+  }
+
+  @Override
+  public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
new file mode 100644
index 0000000000..cfdf0123e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeProcessorSubtask extends PipeSubtask {
+
+  public PipeProcessorSubtask(String taskID) {
+    super(taskID);
+  }
+
+  @Override
+  public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
new file mode 100644
index 0000000000..daebd15e47
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+
+public abstract class PipeSubtask extends WrappedRunnable {
+
+  private final String taskID;
+
+  public PipeSubtask(String taskID) {
+    super();
+    this.taskID = taskID;
+  }
+
+  public String getTaskID() {
+    return taskID;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
new file mode 100644
index 0000000000..930a06cc94
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskCollectorStage implements PipeTaskStage {
+
+  @Override
+  public void create() throws PipeException {}
+
+  @Override
+  public void start() throws PipeException {}
+
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
new file mode 100644
index 0000000000..fddb99fb5e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskConnectorStage implements PipeTaskStage {
+
+  @Override
+  public void create() throws PipeException {}
+
+  @Override
+  public void start() throws PipeException {}
+
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
new file mode 100644
index 0000000000..5a22721a8c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskProcessorStage implements PipeTaskStage {
+
+  @Override
+  public void create() throws PipeException {}
+
+  @Override
+  public void start() throws PipeException {}
+
+  @Override
+  public void stop() throws PipeException {}
+
+  @Override
+  public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
new file mode 100644
index 0000000000..09ae67ef76
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public interface PipeTaskStage {
+
+  /**
+   * Create a pipe task stage.
+   *
+   * @throws PipeException if failed to create a pipe task stage.
+   */
+  void create() throws PipeException;
+
+  /**
+   * Start a pipe task stage.
+   *
+   * @throws PipeException if failed to start a pipe task stage.
+   */
+  void start() throws PipeException;
+
+  /**
+   * Stop a pipe task stage.
+   *
+   * @throws PipeException if failed to stop a pipe task stage.
+   */
+  void stop() throws PipeException;
+
+  /**
+   * Drop a pipe task stage.
+   *
+   * @throws PipeException if failed to drop a pipe task stage.
+   */
+  void drop() throws PipeException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c86e919e6c..347a8e45e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -70,7 +70,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.protocol.rest.RestService;
 import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
 import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
@@ -857,7 +857,7 @@ public class DataNode implements DataNodeMBean {
     // create instances of pipe plugins and do registration
     try {
       for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
-        PipePluginAgent.getInstance().doRegister(meta);
+        PipeAgent.plugin().doRegister(meta);
       }
     } catch (Exception e) {
       throw new StartupException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 60edda2b56..0b35014c49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -102,7 +102,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
@@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) {
     try {
       PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta);
-      PipePluginAgent.getInstance().register(pipePluginMeta, req.jarFile);
+      PipeAgent.plugin().register(pipePluginMeta, req.jarFile);
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
@@ -1493,7 +1493,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) {
     try {
-      PipePluginAgent.getInstance().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
+      PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())