You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 21:16:57 UTC
[iotdb] 03/03: Pipe: Skeleton Code Framework
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5692
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c0b21bd79e098c88300c98887bf4dabbc44e66d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Mar 19 05:16:29 2023 +0800
Pipe: Skeleton Code Framework
---
.../pipe/task/meta/PipeTaskMetaAccessor.java | 20 +---
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 3 +
.../pipe/agent/{ => plugin}/PipePluginAgent.java | 5 +-
.../HeartbeatScheduler.java} | 21 +----
.../MetaSyncScheduler.java} | 20 +---
.../pipe/agent/{ => runtime}/PipeRuntimeAgent.java | 4 +-
.../db/pipe/agent/{ => task}/PipeTaskAgent.java | 4 +-
.../PipeTaskRegionAgent.java} | 20 +---
.../collector/PipeCollectorEventPendingQueue.java} | 20 +---
.../collector/PipeCollectorEventSelector.java} | 20 +---
.../historical/PipeHistoricalCollector.java} | 20 +---
.../collector/realtime/PipeRealtimeCollector.java} | 20 +---
.../realtime/cache/PipeRealtimeEventCache.java} | 20 +---
.../realtime/listener/IoTLogListerner.java} | 20 +---
.../realtime/listener/RatisLogListener.java} | 20 +---
.../realtime/listener/SimpleLogListener.java} | 20 +---
.../listener/TsFileGenerationListener.java} | 20 +---
.../collector/realtime/matcher/Rule.java} | 20 +---
.../realtime/matcher/RulePrefixMatchTree.java} | 20 +---
.../collector/realtime/recorder/TsFileEpoch.java} | 20 +---
.../realtime/recorder/TsFileEpochRecorder.java} | 20 +---
.../connector/PipeConnectorContainer.java} | 20 +---
.../connector/PipeConnectorManager.java} | 20 +---
.../PipeConnectorPluginRuntimeWrapper.java} | 19 ++--
.../event/PipeTabletInsertionEvent.java} | 30 +++---
.../event/PipeTsFileInsertionEvent.java} | 22 ++---
.../iotdb/db/pipe/core/event/access/PipeRow.java | 102 +++++++++++++++++++++
.../db/pipe/core/event/access/PipeRowIterator.java | 60 ++++++++++++
.../event/collector/PipeEventCollector.java} | 27 +++---
.../event/collector/PipeRowCollector.java} | 21 ++---
.../event/indexer/PipeEventIndexer.java} | 20 +---
.../event/indexer/PipeIoTEventIndexer.java} | 20 +---
.../event/indexer/PipeRatisEventIndexer.java} | 20 +---
.../event/indexer/PipeSimpleEventIndexer.java} | 20 +---
.../event/indexer/PipeTsFileEventIndexer.java} | 20 +---
.../PipeProcessorPluginRuntimeWrapper.java} | 19 ++--
.../executor/PipeAssignerSubtaskExecutor.java} | 20 +---
.../executor/PipeConnectorSubtaskExecutor.java} | 20 +---
.../executor/PipeProcessorSubtaskExecutor.java} | 20 +---
.../executor/PipeSubtaskExecutor.java} | 20 +---
.../pipe/execution/executor/PipeTaskExecutor.java | 49 ++++++++++
.../scheduler/PipeAssignerSubtaskScheduler.java} | 24 +++--
.../scheduler/PipeConnectorSubtaskScheduler.java} | 24 +++--
.../scheduler/PipeProcessorSubtaskScheduler.java} | 24 +++--
.../scheduler/PipeSubtaskScheduler.java} | 21 ++---
.../execution/scheduler/PipeTaskScheduler.java | 60 ++++++++++++
.../PipeFileManager.java} | 20 +---
.../PipeRaftlogHolder.java} | 20 +---
.../PipeTsFileHolder.java} | 20 +---
.../PipeWALHolder.java} | 20 +---
.../PipeRuntimeAgent.java => task/PipeTask.java} | 34 ++++---
.../PipeTaskBuilder.java} | 21 +----
.../metrics/PipeTaskRuntimeRecorder.java} | 20 +---
.../runnable/PipeAssignerSubtask.java} | 20 ++--
.../runnable/PipeConnectorSubtask.java} | 20 ++--
.../runnable/PipeProcessorSubtask.java} | 20 ++--
.../runnable/PipeSubtask.java} | 20 ++--
.../stage/PipeTaskCollectorStage.java} | 25 +++--
.../stage/PipeTaskConnectorStage.java} | 25 +++--
.../stage/PipeTaskProcessorStage.java} | 25 +++--
.../stage/PipeTaskStage.java} | 41 ++++++---
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
62 files changed, 571 insertions(+), 865 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
index 5034fb50e7..04653122c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.commons.pipe.task.meta;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeTaskMetaAccessor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 7e3b3a1e87..1c50fe5b40 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.pipe.agent;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
+import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
+import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
/** PipeAgent is the entry point of the pipe module in DatNode. */
public class PipeAgent {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 57cb0fb1b3..6831ac6a34 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.plugin;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
@@ -188,7 +188,8 @@ public class PipePluginAgent {
private static PipePluginAgent instance = null;
}
- static PipePluginAgent setupAndGetInstance(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+ public static PipePluginAgent setupAndGetInstance(
+ DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
if (PipePluginAgentServiceHolder.instance == null) {
PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
index 5034fb50e7..de3f30f388 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
@@ -17,22 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
+public class HeartbeatScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
index 5034fb50e7..366176cd1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/MetaSyncScheduler.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class MetaSyncScheduler {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index e42b1f66f3..cbfe53be8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.runtime;
public class PipeRuntimeAgent {
@@ -29,7 +29,7 @@ public class PipeRuntimeAgent {
private static PipeRuntimeAgent INSTANCE = null;
}
- static PipeRuntimeAgent setupAndGetInstance() {
+ public static PipeRuntimeAgent setupAndGetInstance() {
if (PipeRuntimeAgentHolder.INSTANCE == null) {
PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
similarity index 92%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 5034fb50e7..dd8a6984f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
public class PipeTaskAgent {
@@ -29,7 +29,7 @@ public class PipeTaskAgent {
private static PipeTaskAgent instance = null;
}
- static PipeTaskAgent setupAndGetInstance() {
+ public static PipeTaskAgent setupAndGetInstance() {
if (PipeTaskAgentHolder.instance == null) {
PipeTaskAgentHolder.instance = new PipeTaskAgent();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
index 5034fb50e7..7be285ef66 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskRegionAgent.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.agent.task;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeTaskRegionAgent {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
index 5034fb50e7..c3bfb68015 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventPendingQueue.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeCollectorEventPendingQueue {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
index 5034fb50e7..e1c50a9601 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/PipeCollectorEventSelector.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeCollectorEventSelector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
index 5034fb50e7..6e680de847 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalCollector.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.historical;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeHistoricalCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
index 5034fb50e7..b5fad778b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeCollector.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeRealtimeCollector {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
index 5034fb50e7..7525e06b54 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/cache/PipeRealtimeEventCache.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.cache;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeRealtimeEventCache {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
index 5034fb50e7..b91334430d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/IoTLogListerner.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class IoTLogListerner {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
index 5034fb50e7..3aa42354f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/RatisLogListener.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class RatisLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
index 5034fb50e7..b5e1eaf93b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/SimpleLogListener.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class SimpleLogListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
index 5034fb50e7..4c28795219 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/TsFileGenerationListener.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.listener;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class TsFileGenerationListener {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
index 5034fb50e7..948fab2acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/Rule.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class Rule {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
index 5034fb50e7..a543d7f0ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/RulePrefixMatchTree.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class RulePrefixMatchTree {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
index 5034fb50e7..a85d3a0b70 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpoch.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class TsFileEpoch {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
index 5034fb50e7..6948cef200 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/recorder/TsFileEpochRecorder.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.collector.realtime.recorder;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class TsFileEpochRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
index 5034fb50e7..8651ed3df1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorContainer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeConnectorContainer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
index 5034fb50e7..881edd67f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorManager.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeConnectorManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
index 5034fb50e7..c1c5be1166 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.connector;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeConnector;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeConnectorPluginRuntimeWrapper {
- private PipeTaskAgent() {}
+ private final PipeConnector pipeConnector;
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
+ public PipeConnectorPluginRuntimeWrapper(PipeConnector pipeConnector) {
+ this.pipeConnector = pipeConnector;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
index e42b1f66f3..a58e8db497 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEvent.java
@@ -17,22 +17,30 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.write.record.Tablet;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+import java.util.Iterator;
+import java.util.function.BiConsumer;
- private PipeRuntimeAgent() {}
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
- private static class PipeRuntimeAgentHolder {
- private static PipeRuntimeAgent INSTANCE = null;
+ @Override
+ public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
+ return null;
}
- static PipeRuntimeAgent setupAndGetInstance() {
- if (PipeRuntimeAgentHolder.INSTANCE == null) {
- PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
- }
- return PipeRuntimeAgentHolder.INSTANCE;
+ @Override
+ public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, RowCollector> consumer) {
+ return null;
+ }
+
+ @Override
+ public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
index 5034fb50e7..d11b4f780d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/PipeTsFileInsertionEvent.java
@@ -17,22 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ @Override
+ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ return null;
}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
+ @Override
+ public TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable) {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
new file mode 100644
index 0000000000..43b438445f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRow.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRow implements Row {
+
+ @Override
+ public long getTime() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int getInt(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLong(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float getFloat(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public double getDouble(int columnIndex) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) throws IOException {
+ return false;
+ }
+
+ @Override
+ public Binary getBinary(int columnIndex) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getString(int columnIndex) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Type getDataType(int columnIndex) {
+ return null;
+ }
+
+ @Override
+ public boolean isNull(int columnIndex) {
+ return false;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+ return 0;
+ }
+
+ @Override
+ public List<Path> getColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<Type> getColumnTypes() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
new file mode 100644
index 0000000000..960214bea6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/access/PipeRowIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.access;
+
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.access.RowIterator;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PipeRowIterator implements RowIterator {
+
+ @Override
+ public boolean hasNextRow() {
+ return false;
+ }
+
+ @Override
+ public Row next() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void reset() {}
+
+ @Override
+ public int getColumnIndex(Path columnName) throws PipeParameterNotValidException {
+ return 0;
+ }
+
+ @Override
+ public List<Path> getColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<Type> getColumnTypes() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
index e42b1f66f3..2c2bbfd38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeEventCollector.java
@@ -17,22 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+import java.io.IOException;
- private PipeRuntimeAgent() {}
+public class PipeEventCollector implements EventCollector {
- private static class PipeRuntimeAgentHolder {
- private static PipeRuntimeAgent INSTANCE = null;
- }
+ @Override
+ public void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException {}
- static PipeRuntimeAgent setupAndGetInstance() {
- if (PipeRuntimeAgentHolder.INSTANCE == null) {
- PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
- }
- return PipeRuntimeAgentHolder.INSTANCE;
- }
+ @Override
+ public void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException {}
+
+ @Override
+ public void collectDeletionEvent(DeletionEvent event) throws IOException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
index 5034fb50e7..525e79c137 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/collector/PipeRowCollector.java
@@ -17,22 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.collector;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+import java.io.IOException;
- private PipeTaskAgent() {}
+public class PipeRowCollector implements RowCollector {
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void collectRow(Row row) throws IOException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
index 5034fb50e7..22cbce9e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeEventIndexer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public interface PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
index 5034fb50e7..63aff8a519 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeIoTEventIndexer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeIoTEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
index 5034fb50e7..4520855085 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeRatisEventIndexer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeRatisEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
index 5034fb50e7..d5add45547 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeSimpleEventIndexer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeSimpleEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
index 5034fb50e7..be53e89751 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/indexer/PipeTsFileEventIndexer.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.event.indexer;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeTsFileEventIndexer implements PipeEventIndexer {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
index 5034fb50e7..3153d2ae0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeProcessorPluginRuntimeWrapper.java
@@ -17,22 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.core.processor;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.PipeProcessor;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeProcessorPluginRuntimeWrapper {
- private PipeTaskAgent() {}
+ private final PipeProcessor pipeProcessor;
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
+ public PipeProcessorPluginRuntimeWrapper(PipeProcessor pipeProcessor) {
+ this.pipeProcessor = pipeProcessor;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
index 5034fb50e7..cc3dd43987 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeAssignerSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
index 5034fb50e7..98eaf31d1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeConnectorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
index 5034fb50e7..c61871fafe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeProcessorSubtaskExecutor implements PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 5034fb50e7..7d97605dff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.executor;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public interface PipeSubtaskExecutor {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
new file mode 100644
index 0000000000..4437fb119d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeTaskExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.executor;
+
+/**
+ * PipeTaskExecutor is responsible for executing the pipe tasks, and it is scheduled by the
+ * PipeTaskScheduler. It is a singleton class.
+ */
+public class PipeTaskExecutor {
+
+ private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor =
+ new PipeAssignerSubtaskExecutor();
+ private final PipeProcessorSubtaskExecutor processorSubtaskExecutor =
+ new PipeProcessorSubtaskExecutor();
+ private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor =
+ new PipeConnectorSubtaskExecutor();
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeTaskExecutor() {}
+
+ private static class PipeTaskExecutorHolder {
+ private static PipeTaskExecutor instance = null;
+ }
+
+ public static PipeTaskExecutor setupAndGetInstance() {
+ if (PipeTaskExecutorHolder.instance == null) {
+ PipeTaskExecutorHolder.instance = new PipeTaskExecutor();
+ }
+ return PipeTaskExecutorHolder.instance;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
index 5034fb50e7..2cab31d737 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeAssignerSubtaskScheduler.java
@@ -17,22 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeAssignerSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
- private PipeTaskAgent() {}
+ @Override
+ public void dropSubtask(String subtaskId) {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void startSubtask(String subtaskId) {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stopSubtask(String subtaskId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
index 5034fb50e7..c53c6b040d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeConnectorSubtaskScheduler.java
@@ -17,22 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeConnectorSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
- private PipeTaskAgent() {}
+ @Override
+ public void dropSubtask(String subtaskId) {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void startSubtask(String subtaskId) {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stopSubtask(String subtaskId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
index 5034fb50e7..9f5df481b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeProcessorSubtaskScheduler.java
@@ -17,22 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeProcessorSubtaskScheduler implements PipeSubtaskScheduler {
+ @Override
+ public void createSubtask(String subtaskId, PipeSubtask subtask) {}
- private PipeTaskAgent() {}
+ @Override
+ public void dropSubtask(String subtaskId) {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void startSubtask(String subtaskId) {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stopSubtask(String subtaskId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
index 5034fb50e7..c87f949103 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -17,22 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.execution.scheduler;
-public class PipeTaskAgent {
+import org.apache.iotdb.db.pipe.task.runnable.PipeSubtask;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public interface PipeSubtaskScheduler {
- private PipeTaskAgent() {}
+ void createSubtask(String subtaskId, PipeSubtask subtask);
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ void dropSubtask(String subtaskId);
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ void startSubtask(String subtaskId);
+
+ void stopSubtask(String subtaskId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
new file mode 100644
index 0000000000..cda2cc9466
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.task.PipeTask;
+
+/**
+ * PipeTaskScheduler is responsible for scheduling the pipe tasks. It takes the pipe tasks and
+ * executes them in the PipeTaskExecutor. It is a singleton class.
+ */
+public class PipeTaskScheduler {
+
+ private final PipeSubtaskScheduler assignerSubtaskScheduler;
+ private final PipeSubtaskScheduler processorSubtaskScheduler;
+ private final PipeSubtaskScheduler connectorSubtaskScheduler;
+
+ public void createPipeTask(PipeTask pipeTask) {}
+
+ public void dropPipeTask(String pipeName) {}
+
+ public void startPipeTask(String pipeName) {}
+
+ public void stopPipeTask(String pipeName) {}
+
+ ///////////////////////// Singleton Instance Holder /////////////////////////
+
+ private PipeTaskScheduler() {
+ assignerSubtaskScheduler = new PipeAssignerSubtaskScheduler();
+ processorSubtaskScheduler = new PipeProcessorSubtaskScheduler();
+ connectorSubtaskScheduler = new PipeConnectorSubtaskScheduler();
+ }
+
+ private static class PipeTaskSchedulerHolder {
+ private static PipeTaskScheduler instance = null;
+ }
+
+ public static PipeTaskScheduler setupAndGetInstance() {
+ if (PipeTaskSchedulerHolder.instance == null) {
+ PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
+ }
+ return PipeTaskSchedulerHolder.instance;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
index 5034fb50e7..25ce3d1142 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
index 5034fb50e7..984c3fbaf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
index 5034fb50e7..04fe515e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileHolder.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeTsFileHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
index 5034fb50e7..78e0a7c612 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.resource;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index e42b1f66f3..e3a2819deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -17,22 +17,32 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
-public class PipeRuntimeAgent {
+import org.apache.iotdb.db.pipe.task.metrics.PipeTaskRuntimeRecorder;
+import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeTask {
- private PipeRuntimeAgent() {}
+ private final String pipeName;
- private static class PipeRuntimeAgentHolder {
- private static PipeRuntimeAgent INSTANCE = null;
- }
+ private final PipeTaskStage collectorStage;
+ private final PipeTaskStage processorStage;
+ private final PipeTaskStage connectorStage;
+
+ private final PipeTaskRuntimeRecorder runtimeRecorder;
+
+ public PipeTask(
+ String pipeName,
+ PipeTaskStage collectorStage,
+ PipeTaskStage processorStage,
+ PipeTaskStage connectorStage) {
+ this.pipeName = pipeName;
+
+ this.collectorStage = collectorStage;
+ this.processorStage = processorStage;
+ this.connectorStage = connectorStage;
- static PipeRuntimeAgent setupAndGetInstance() {
- if (PipeRuntimeAgentHolder.INSTANCE == null) {
- PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
- }
- return PipeRuntimeAgentHolder.INSTANCE;
+ runtimeRecorder = new PipeTaskRuntimeRecorder();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 5034fb50e7..19e5f460ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -17,22 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+/** PipeTaskBuilder is used to build a PipeTask. */
+public class PipeTaskBuilder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
index 5034fb50e7..6f09614958 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/metrics/PipeTaskRuntimeRecorder.java
@@ -17,22 +17,6 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.metrics;
-public class PipeTaskAgent {
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
-
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
-}
+public class PipeTaskRuntimeRecorder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
index 5034fb50e7..5890801ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeAssignerSubtask.java
@@ -17,22 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
-public class PipeTaskAgent {
+public class PipeAssignerSubtask extends PipeSubtask {
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ public PipeAssignerSubtask(String taskID) {
+ super(taskID);
}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void runMayThrow() throws Throwable {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
index 5034fb50e7..b199607241 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeConnectorSubtask.java
@@ -17,22 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
-public class PipeTaskAgent {
+public class PipeConnectorSubtask extends PipeSubtask {
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ public PipeConnectorSubtask(String taskID) {
+ super(taskID);
}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void runMayThrow() throws Throwable {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
index 5034fb50e7..cfdf0123e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeProcessorSubtask.java
@@ -17,22 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
-public class PipeTaskAgent {
+public class PipeProcessorSubtask extends PipeSubtask {
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskAgent() {}
-
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ public PipeProcessorSubtask(String taskID) {
+ super(taskID);
}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void runMayThrow() throws Throwable {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
index 5034fb50e7..daebd15e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/runnable/PipeSubtask.java
@@ -17,22 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.runnable;
-public class PipeTaskAgent {
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public abstract class PipeSubtask extends WrappedRunnable {
- private PipeTaskAgent() {}
+ private final String taskID;
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
+ public PipeSubtask(String taskID) {
+ super();
+ this.taskID = taskID;
}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
+ public String getTaskID() {
+ return taskID;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 5034fb50e7..930a06cc94 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -17,22 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeTaskCollectorStage implements PipeTaskStage {
- private PipeTaskAgent() {}
+ @Override
+ public void create() throws PipeException {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void start() throws PipeException {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
similarity index 62%
copy from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 5034fb50e7..fddb99fb5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -17,22 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeTaskConnectorStage implements PipeTaskStage {
- private PipeTaskAgent() {}
+ @Override
+ public void create() throws PipeException {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void start() throws PipeException {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
similarity index 62%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 5034fb50e7..5a22721a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeTaskAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -17,22 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
-public class PipeTaskAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public class PipeTaskProcessorStage implements PipeTaskStage {
- private PipeTaskAgent() {}
+ @Override
+ public void create() throws PipeException {}
- private static class PipeTaskAgentHolder {
- private static PipeTaskAgent instance = null;
- }
+ @Override
+ public void start() throws PipeException {}
- static PipeTaskAgent setupAndGetInstance() {
- if (PipeTaskAgentHolder.instance == null) {
- PipeTaskAgentHolder.instance = new PipeTaskAgent();
- }
- return PipeTaskAgentHolder.instance;
- }
+ @Override
+ public void stop() throws PipeException {}
+
+ @Override
+ public void drop() throws PipeException {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
similarity index 52%
rename from server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index e42b1f66f3..09ae67ef76 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -17,22 +17,37 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent;
+package org.apache.iotdb.db.pipe.task.stage;
-public class PipeRuntimeAgent {
+import org.apache.iotdb.pipe.api.exception.PipeException;
- ///////////////////////// Singleton Instance Holder /////////////////////////
+public interface PipeTaskStage {
- private PipeRuntimeAgent() {}
+ /**
+ * Create a pipe task stage.
+ *
+ * @throws PipeException if failed to create a pipe task stage.
+ */
+ void create() throws PipeException;
- private static class PipeRuntimeAgentHolder {
- private static PipeRuntimeAgent INSTANCE = null;
- }
+ /**
+ * Start a pipe task stage.
+ *
+ * @throws PipeException if failed to start a pipe task stage.
+ */
+ void start() throws PipeException;
- static PipeRuntimeAgent setupAndGetInstance() {
- if (PipeRuntimeAgentHolder.INSTANCE == null) {
- PipeRuntimeAgentHolder.INSTANCE = new PipeRuntimeAgent();
- }
- return PipeRuntimeAgentHolder.INSTANCE;
- }
+ /**
+ * Stop a pipe task stage.
+ *
+ * @throws PipeException if failed to stop a pipe task stage.
+ */
+ void stop() throws PipeException;
+
+ /**
+ * Drop a pipe task stage.
+ *
+ * @throws PipeException if failed to drop a pipe task stage.
+ */
+ void drop() throws PipeException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 308afe736d..39b4e59b9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.service.thrift.impl;
-import com.google.common.collect.ImmutableList;
import org.apache.iotdb.common.rpc.thrift.*;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -113,6 +112,8 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1068,7 +1069,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
if (configNodeLocations != null) {
ConfigNodeInfo.getInstance()
.updateConfigNodeList(
- configNodeLocations.parallelStream()
+ configNodeLocations
+ .parallelStream()
.map(TConfigNodeLocation::getInternalEndPoint)
.collect(Collectors.toList()));
}