You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/19 10:39:38 UTC
[iotdb] branch master updated: [IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ad8940cbef [IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)
ad8940cbef is described below
commit ad8940cbefc3935902c491e5157682f4235ad599
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Mar 19 18:39:31 2023 +0800
[IOTDB-5692] Pipe: DataNode skeleton code framework (#9373)
---
.../pipe/task/meta/PipeTaskMetaAccessor.java | 22 +++++
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 74 +++++++++++++++
.../pipe/agent/{ => plugin}/PipePluginAgent.java | 19 ++--
.../db/pipe/agent/runtime/HeartbeatScheduler.java | 23 +++++
.../db/pipe/agent/runtime/MetaSyncScheduler.java | 22 +++++
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 38 ++++++++
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 38 ++++++++
.../db/pipe/agent/task/PipeTaskRegionAgent.java | 22 +++++
.../collector/PipeCollectorEventPendingQueue.java | 22 +++++
.../core/collector/PipeCollectorEventSelector.java | 22 +++++
.../historical/PipeHistoricalCollector.java | 22 +++++
.../collector/realtime/PipeRealtimeCollector.java | 22 +++++
.../realtime/cache/PipeRealtimeEventCache.java | 22 +++++
.../realtime/listener/IoTLogListerner.java | 22 +++++
.../realtime/listener/RatisLogListener.java | 22 +++++
.../realtime/listener/SimpleLogListener.java | 22 +++++
.../listener/TsFileGenerationListener.java | 22 +++++
.../pipe/core/collector/realtime/matcher/Rule.java | 22 +++++
.../realtime/matcher/RulePrefixMatchTree.java | 22 +++++
.../collector/realtime/recorder/TsFileEpoch.java | 22 +++++
.../realtime/recorder/TsFileEpochRecorder.java | 22 +++++
.../core/connector/PipeConnectorContainer.java | 22 +++++
.../pipe/core/connector/PipeConnectorManager.java | 22 +++++
.../PipeConnectorPluginRuntimeWrapper.java | 31 +++++++
.../pipe/core/event/PipeTabletInsertionEvent.java | 46 ++++++++++
.../pipe/core/event/PipeTsFileInsertionEvent.java | 36 ++++++++
.../iotdb/db/pipe/core/event/access/PipeRow.java | 102 +++++++++++++++++++++
.../db/pipe/core/event/access/PipeRowIterator.java | 60 ++++++++++++
.../core/event/collector/PipeEventCollector.java | 39 ++++++++
.../core/event/collector/PipeRowCollector.java | 31 +++++++
.../pipe/core/event/indexer/PipeEventIndexer.java | 22 +++++
.../core/event/indexer/PipeIoTEventIndexer.java | 22 +++++
.../core/event/indexer/PipeRatisEventIndexer.java | 22 +++++
.../core/event/indexer/PipeSimpleEventIndexer.java | 22 +++++
.../core/event/indexer/PipeTsFileEventIndexer.java | 22 +++++
.../PipeProcessorPluginRuntimeWrapper.java | 31 +++++++
.../executor/PipeAssignerSubtaskExecutor.java | 22 +++++
.../executor/PipeConnectorSubtaskExecutor.java | 22 +++++
.../executor/PipeProcessorSubtaskExecutor.java | 22 +++++
.../execution/executor/PipeSubtaskExecutor.java | 22 +++++
.../pipe/execution/executor/PipeTaskExecutor.java | 49 ++++++++++
.../scheduler/PipeAssignerSubtaskScheduler.java | 36 ++++++++
.../scheduler/PipeConnectorSubtaskScheduler.java | 36 ++++++++
.../scheduler/PipeProcessorSubtaskScheduler.java | 36 ++++++++
.../execution/scheduler/PipeSubtaskScheduler.java | 33 +++++++
.../execution/scheduler/PipeTaskScheduler.java | 60 ++++++++++++
.../iotdb/db/pipe/resource/PipeFileManager.java | 22 +++++
.../iotdb/db/pipe/resource/PipeRaftlogHolder.java | 22 +++++
.../iotdb/db/pipe/resource/PipeTsFileHolder.java | 22 +++++
.../iotdb/db/pipe/resource/PipeWALHolder.java | 22 +++++
.../org/apache/iotdb/db/pipe/task/PipeTask.java | 48 ++++++++++
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 23 +++++
.../pipe/task/metrics/PipeTaskRuntimeRecorder.java | 22 +++++
.../db/pipe/task/runnable/PipeAssignerSubtask.java | 30 ++++++
.../pipe/task/runnable/PipeConnectorSubtask.java | 30 ++++++
.../pipe/task/runnable/PipeProcessorSubtask.java | 30 ++++++
.../iotdb/db/pipe/task/runnable/PipeSubtask.java | 36 ++++++++
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 37 ++++++++
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 37 ++++++++
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 37 ++++++++
.../iotdb/db/pipe/task/stage/PipeTaskStage.java | 53 +++++++++++
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
63 files changed, 1882 insertions(+), 11 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
new file mode 100644
index 0000000000..04653122c2
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+public class PipeTaskMetaAccessor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
new file mode 100644
index 0000000000..1c50fe5b40
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
+
+/** PipeAgent is the entry point of the pipe module in DatNode. */
+public class PipeAgent {
+
+ private final PipePluginAgent pipePluginAgent;
+ private final PipeTaskAgent pipeTaskAgent;
+ private final PipeRuntimeAgent pipeRuntimeAgent;
+
+ /** Private constructor to prevent users from creating a new instance. */
+ private PipeAgent() {
+ final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
+
+ pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+ pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
+ pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
+ }
+
+ /** The singleton holder of PipeAgent. */
+ private static class PipeAgentHolder {
+ private static final PipeAgent HANDLE = new PipeAgent();
+ }
+
+ /**
+ * Get the singleton instance of PipeTaskAgent.
+ *
+ * @return the singleton instance of PipeTaskAgent
+ */
+ public static PipeTaskAgent task() {
+ return PipeAgentHolder.HANDLE.pipeTaskAgent;
+ }
+
+ /**
+ * Get the singleton instance of PipePluginAgent.
+ *
+ * @return the singleton instance of PipePluginAgent
+ */
+ public static PipePluginAgent plugin() {
+ return PipeAgentHolder.HANDLE.pipePluginAgent;
+ }
+
+ /**
+ * Get the singleton instance of PipeRuntimeAgent.
+ *
+ * @return the singleton instance of PipeRuntimeAgent
+ */
+ public static PipeRuntimeAgent runtime() {
+ return PipeAgentHolder.HANDLE.pipeRuntimeAgent;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index dc0565d4a6..d4c8359bcc 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.plugin;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
@@ -41,8 +41,7 @@ public class PipePluginAgent {
private final ReentrantLock lock = new ReentrantLock();
- private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper =
- new DataNodePipePluginMetaKeeper();
+ private final DataNodePipePluginMetaKeeper pipePluginMetaKeeper;
/////////////////////////////// Lock ///////////////////////////////
@@ -189,11 +188,19 @@ public class PipePluginAgent {
///////////////////////// Singleton Instance Holder /////////////////////////
+ private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+ this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+ }
+
private static class PipePluginAgentServiceHolder {
- private static final PipePluginAgent INSTANCE = new PipePluginAgent();
+ private static PipePluginAgent instance = null;
}
- public static PipePluginAgent getInstance() {
- return PipePluginAgentServiceHolder.INSTANCE;
+ public static PipePluginAgent setupAndGetInstance(
+ DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+ if (PipePluginAgentServiceHolder.instance == null) {
+ PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+ }
+ return PipePluginAgentServiceHolder.instance;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
new file mode 100644
index 0000000000..de3f30f388
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
+public class HeartbeatScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
new file mode 100644
index 0000000000..366176cd1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+public class MetaSyncScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
new file mode 100644
index 0000000000..cbfe53be8b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+public class PipeRuntimeAgent {
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeRuntimeAgent() {}
+
+ private static class PipeRuntimeAgentHolder {
+ private static PipeRuntimeAgent INSTANCE = null;
+ }
+
+ public static PipeRuntimeAgent setupAndGetInstance() {
+ if (PipeRuntimeAgentHolder.INSTANCE == null) {
+ PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
+ }
+ return PipeRuntimeAgentHolder.INSTANCE;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
new file mode 100644
index 0000000000..dd8a6984f0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task;
+
+public class PipeTaskAgent {
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeTaskAgent() {}
+
+ private static class PipeTaskAgentHolder {
+ private static PipeTaskAgent instance = null;
+ }
+
+ public static PipeTaskAgent setupAndGetInstance() {
+ if (PipeTaskAgentHolder.instance == null) {
+ PipeTaskAgentHolder.instance = new PipeTaskAgent();
+ }
+ return PipeTaskAgentHolder.instance;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
new file mode 100644
index 0000000000..7be285ef66
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.task;
+
+public class PipeTaskRegionAgent {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
new file mode 100644
index 0000000000..c3bfb68015
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+public class PipeCollectorEventPendingQueue {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
new file mode 100644
index 0000000000..e1c50a9601
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector;
+
+public class PipeCollectorEventSelector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
new file mode 100644
index 0000000000..6e680de847
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+public class PipeHistoricalCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
new file mode 100644
index 0000000000..b5fad778b0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+public class PipeRealtimeCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
new file mode 100644
index 0000000000..7525e06b54
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
+
+public class PipeRealtimeEventCache {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
new file mode 100644
index 0000000000..b91334430d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class IoTLogListerner {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
new file mode 100644
index 0000000000..3aa42354f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class RatisLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
new file mode 100644
index 0000000000..b5e1eaf93b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class SimpleLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
new file mode 100644
index 0000000000..4c28795219
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
+
+public class TsFileGenerationListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
new file mode 100644
index 0000000000..948fab2acd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+public class Rule {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
new file mode 100644
index 0000000000..a543d7f0ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
+
+public class RulePrefixMatchTree {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
new file mode 100644
index 0000000000..a85d3a0b70
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
+
+public class TsFileEpoch {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
new file mode 100644
index 0000000000..6948cef200
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
+
+public class TsFileEpochRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
new file mode 100644
index 0000000000..8651ed3df1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+public class PipeConnectorContainer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
new file mode 100644
index 0000000000..881edd67f8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+public class PipeConnectorManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
new file mode 100644
index 0000000000..c1c5be1166
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+
+public class PipeConnectorPluginRuntimeWrapper {
+
+ private final PipeConnector pipeConnector;
+
+ public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) {
+ this.pipeConnector = pipeConnector;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
new file mode 100644
index 0000000000..a58e8db497
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+
+ @Override
+ public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
+ return null;
+ }
+
+ @Override
+ public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) {
+ return null;
+ }
+
+ @Override
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
new file mode 100644
index 0000000000..d11b4f780d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
+
+ @Override
+ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ return null;
+ }
+
+ @Override
+ public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
new file mode 100644
index 0000000000..43b438445f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRow implements Row {
+
+ @Override
+ public long getTime() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws IOException {
+ return false;
+ }
+
+ @Override
+ public Binary getBinary(int columnIndex) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getString(int columnIndex) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Type getDataType(int columnIndex) {
+ return null;
+ }
+
+ @Override
+ public boolean isNull(int columnIndex) {
+ return false;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+ return 0;
+ }
+
+ @Override
+ public List<Path> getColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<Type> getColumnTypes() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
new file mode 100644
index 0000000000..960214bea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.access.RowIterator;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRowIterator implements RowIterator {
+
+ @Override
+ public boolean hasNextRow() {
+ return false;
+ }
+
+ @Override
+ public Row next() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+ return 0;
+ }
+
+ @Override
+ public List<Path> getColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<Type> getColumnTypes() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
new file mode 100644
index 0000000000..2c2bbfd38f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.collector;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class PipeEventCollector implements EventCollector {
+
+ @Override
+ public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
+
+ @Override
+ public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+
+ @Override
+ public void collectDeletionEvent(DeletionEvent event) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
new file mode 100644
index 0000000000..525e79c137
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.collector;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+
+import java.io.IOException;
+
+public class PipeRowCollector implements RowCollector {
+
+ @Override
+ public void collectRow(Row row) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
new file mode 100644
index 0000000000..22cbce9e04
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public interface PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
new file mode 100644
index 0000000000..63aff8a519
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeIoTEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
new file mode 100644
index 0000000000..4520855085
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeRatisEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
new file mode 100644
index 0000000000..d5add45547
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeSimpleEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
new file mode 100644
index 0000000000..be53e89751
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.indexer;
+
+public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
new file mode 100644
index 0000000000..3153d2ae0e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+
+public class PipeProcessorPluginRuntimeWrapper {
+
+ private final PipeProcessor pipeProcessor;
+
+ public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) {
+ this.pipeProcessor = pipeProcessor;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
new file mode 100644
index 0000000000..cc3dd43987
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
new file mode 100644
index 0000000000..98eaf31d1b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
new file mode 100644
index 0000000000..c61871fafe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
new file mode 100644
index 0000000000..7d97605dff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+public interface PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
new file mode 100644
index 0000000000..4437fb119d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+/**
+ * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
+ * PipeTaskScheduler. It is a singleton class.
+ */
+public class PipeTaskExecutor {
+
+ private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor =
+ new PipeAssignerSubtaskExecutor();
+ private final PipeProcessorSubtaskExecutor processorSubtaskExecutor =
+ new PipeProcessorSubtaskExecutor();
+ private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor =
+ new PipeConnectorSubtaskExecutor();
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeTaskExecutor() {}
+
+ private static class PipeTaskExecutorHolder {
+ private static PipeTaskExecutor instance = null;
+ }
+
+ public static PipeTaskExecutor setupAndGetInstance() {
+ if (PipeTaskExecutorHolder.instance == null) {
+ PipeTaskExecutorHolder.instance = new PipeTaskExecutor();
+ }
+ return PipeTaskExecutorHolder.instance;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
new file mode 100644
index 0000000000..2cab31d737
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+ @Override
+ public void dropSubtask(String subtaskId) {}
+
+ @Override
+ public void startSubtask(String subtaskId) {}
+
+ @Override
+ public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
new file mode 100644
index 0000000000..c53c6b040d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+ @Override
+ public void dropSubtask(String subtaskId) {}
+
+ @Override
+ public void startSubtask(String subtaskId) {}
+
+ @Override
+ public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
new file mode 100644
index 0000000000..9f5df481b8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
+
+ @Override
+ public void dropSubtask(String subtaskId) {}
+
+ @Override
+ public void startSubtask(String subtaskId) {}
+
+ @Override
+ public void stopSubtask(String subtaskId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 0000000000..c87f949103
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
+
+public interface PipeSubtaskScheduler {
+
+ void createSubtask(String subtaskId, PipeSubtask subtask);
+
+ void dropSubtask(String subtaskId);
+
+ void startSubtask(String subtaskId);
+
+ void stopSubtask(String subtaskId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
new file mode 100644
index 0000000000..cda2cc9466
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.PipeTask;
+
+/**
+ * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and
+ * executes them in the PipeTaskExecutor. It is a singleton class.
+ */
+public class PipeTaskScheduler {
+
+ private final PipeSubtaskScheduler assignerSubtaskScheduler;
+ private final PipeSubtaskScheduler processorSubtaskScheduler;
+ private final PipeSubtaskScheduler connectorSubtaskScheduler;
+
+ public void createPipeTask(PipeTask pipeTask) {}
+
+ public void dropPipeTask(String pipeName) {}
+
+ public void startPipeTask(String pipeName) {}
+
+ public void stopPipeTask(String pipeName) {}
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeTaskScheduler() {
+ assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler();
+ processorSubtaskScheduler = new PipeProcessorSubtaskScheduler();
+ connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler();
+ }
+
+ private static class PipeTaskSchedulerHolder {
+ private static PipeTaskScheduler instance = null;
+ }
+
+ public static PipeTaskScheduler setupAndGetInstance() {
+ if (PipeTaskSchedulerHolder.instance == null) {
+ PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
+ }
+ return PipeTaskSchedulerHolder.instance;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
new file mode 100644
index 0000000000..25ce3d1142
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
new file mode 100644
index 0000000000..984c3fbaf2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
new file mode 100644
index 0000000000..04fe515e59
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeTsFileHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
new file mode 100644
index 0000000000..78e0a7c612
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource;
+
+public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
new file mode 100644
index 0000000000..e3a2819deb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
+
+public class PipeTask {
+
+ private final String pipeName;
+
+ private final PipeTaskStage collectorStage;
+ private final PipeTaskStage processorStage;
+ private final PipeTaskStage connectorStage;
+
+ private final PipeTaskRuntimeRecorder runtimeRecorder;
+
+ public PipeTask(
+ String pipeName,
+ PipeTaskStage collectorStage,
+ PipeTaskStage processorStage,
+ PipeTaskStage connectorStage) {
+ this.pipeName = pipeName;
+
+ this.collectorStage = collectorStage;
+ this.processorStage = processorStage;
+ this.connectorStage = connectorStage;
+
+ runtimeRecorder = new PipeTaskRuntimeRecorder();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
new file mode 100644
index 0000000000..19e5f460ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task;
+
+/** PipeTaskBuilder is used to build a PipeTask. */
+public class PipeTaskBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
new file mode 100644
index 0000000000..6f09614958
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.metrics;
+
+public class PipeTaskRuntimeRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
new file mode 100644
index 0000000000..5890801ea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeAssignerSubtask extends PipeSubtask {
+
+ public PipeAssignerSubtask(String taskID) {
+ super(taskID);
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
new file mode 100644
index 0000000000..b199607241
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeConnectorSubtask extends PipeSubtask {
+
+ public PipeConnectorSubtask(String taskID) {
+ super(taskID);
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
new file mode 100644
index 0000000000..cfdf0123e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+public class PipeProcessorSubtask extends PipeSubtask {
+
+ public PipeProcessorSubtask(String taskID) {
+ super(taskID);
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
new file mode 100644
index 0000000000..daebd15e47
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.runnable;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+
+public abstract class PipeSubtask extends WrappedRunnable {
+
+ private final String taskID;
+
+ public PipeSubtask(String taskID) {
+ super();
+ this.taskID = taskID;
+ }
+
+ public String getTaskID() {
+ return taskID;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
new file mode 100644
index 0000000000..930a06cc94
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskCollectorStage implements PipeTaskStage {
+
+ @Override
+ public void create() throws PipeException {}
+
+ @Override
+ public void start() throws PipeException {}
+
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
new file mode 100644
index 0000000000..fddb99fb5e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskConnectorStage implements PipeTaskStage {
+
+ @Override
+ public void create() throws PipeException {}
+
+ @Override
+ public void start() throws PipeException {}
+
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
new file mode 100644
index 0000000000..5a22721a8c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public class PipeTaskProcessorStage implements PipeTaskStage {
+
+ @Override
+ public void create() throws PipeException {}
+
+ @Override
+ public void start() throws PipeException {}
+
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
new file mode 100644
index 0000000000..09ae67ef76
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.stage;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+public interface PipeTaskStage {
+
+ /**
+ * Create a pipe task stage.
+ *
+ * @throws PipeException if failed to create a pipe task stage.
+ */
+ void create() throws PipeException;
+
+ /**
+ * Start a pipe task stage.
+ *
+ * @throws PipeException if failed to start a pipe task stage.
+ */
+ void start() throws PipeException;
+
+ /**
+ * Stop a pipe task stage.
+ *
+ * @throws PipeException if failed to stop a pipe task stage.
+ */
+ void stop() throws PipeException;
+
+ /**
+ * Drop a pipe task stage.
+ *
+ * @throws PipeException if failed to drop a pipe task stage.
+ */
+ void drop() throws PipeException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c86e919e6c..347a8e45e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -70,7 +70,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.protocol.rest.RestService;
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
@@ -857,7 +857,7 @@ public class DataNode implements DataNodeMBean {
// create instances of pipe plugins and do registration
try {
for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
- PipePluginAgent.getInstance().doRegister(meta);
+ PipeAgent.plugin().doRegister(meta);
}
} catch (Exception e) {
throw new StartupException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 60edda2b56..0b35014c49 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -102,7 +102,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.control.clientsession.InternalClientSession;
@@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) {
try {
PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta);
- PipePluginAgent.getInstance().register(pipePluginMeta, req.jarFile);
+ PipeAgent.plugin().register(pipePluginMeta, req.jarFile);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
@@ -1493,7 +1493,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) {
try {
- PipePluginAgent.getInstance().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
+ PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())