You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 18:16:01 UTC

[iotdb] branch master updated: [IOTDB-5907] Pipe: pipe subtask scheduler (#9915)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f8516ed32d8 [IOTDB-5907] Pipe: pipe subtask scheduler (#9915)
f8516ed32d8 is described below

commit f8516ed32d89dd96211d9e5a1e5d458b4e16dec1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 23 02:15:54 2023 +0800

    [IOTDB-5907] Pipe: pipe subtask scheduler (#9915)
    
    * remove deletion event and introduce generic event to pipe engine
    
    * remove getType() method of Event
    
    * executeForAWhile -> executeOnce
    
    * safety start pipe tasks
    
    * pipe task scheduler
    
    * rename package
---
 .../statemachine/ConfigRegionStateMachine.java     |  6 +-
 .../manager/pipe/runtime/PipeMetaSyncer.java       | 21 ++++-
 .../builtin/connector/DoNothingConnector.java      |  4 +-
 .../builtin/connector/IoTDBThriftConnector.java    |  4 +-
 .../builtin/processor/DoNothingProcessor.java      | 11 ++-
 .../org/apache/iotdb/pipe/api/PipeConnector.java   | 10 +--
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   | 10 +--
 .../iotdb/pipe/api/collector/EventCollector.java   | 39 ++--------
 .../org/apache/iotdb/pipe/api/event/Event.java     |  6 +-
 .../pipe/api/event/dml/deletion/DeletionEvent.java | 48 ------------
 .../event/dml/insertion/TabletInsertionEvent.java  |  6 --
 .../event/dml/insertion/TsFileInsertionEvent.java  |  6 --
 .../PipeRealtimeDataRegionHybridCollector.java     | 49 ++++++------
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  6 +-
 .../event/realtime/PipeRealtimeCollectEvent.java   |  6 --
 .../event/view/collector/PipeEventCollector.java   | 19 +----
 .../execution/executor/PipeSubtaskExecutor.java    | 24 ++++--
 .../execution/scheduler/PipeSubtaskScheduler.java  | 89 ++++++++++++++++++++++
 .../execution/scheduler/PipeTaskScheduler.java     | 74 ------------------
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 27 +++----
 .../db/pipe/task/subtask/PipeProcessorSubtask.java | 26 +++----
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    | 34 +++++++--
 .../core/collector/PipeRealtimeCollectTest.java    | 20 +++--
 23 files changed, 242 insertions(+), 303 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 53e7f9ae688..4964ba79774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -64,7 +64,7 @@ public class ConfigRegionStateMachine
   private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class);
 
   private static final ExecutorService threadPool =
-      IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
+      IoTDBThreadPoolFactory.newCachedThreadPool("ConfigNode-Manager-Recovery");
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
@@ -218,7 +218,9 @@ public class ConfigRegionStateMachine
       // 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
       // initialized after notifyLeaderChanged finished
       threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
-      configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
+
+      threadPool.submit(
+          () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..4d6ebb04ff0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,8 @@ public class PipeMetaSyncer {
   private static final ScheduledExecutorService SYNC_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_META_SYNC_SERVICE.getName());
-  // TODO: make this configurable
+  // TODO: make these configurable
+  private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
   private static final long SYNC_INTERVAL_MINUTES = 3;
 
   private final ConfigManager configManager;
@@ -52,15 +53,29 @@ public class PipeMetaSyncer {
   }
 
   public synchronized void start() {
+    while (configManager.getConsensusManager() == null) {
+      try {
+        LOGGER.info("consensus layer is not ready, sleep 1s...");
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+      }
+    }
+
     if (metaSyncFuture == null) {
       metaSyncFuture =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+              SYNC_EXECUTOR,
+              this::sync,
+              INITIAL_SYNC_DELAY_MINUTES,
+              SYNC_INTERVAL_MINUTES,
+              TimeUnit.MINUTES);
       LOGGER.info("PipeMetaSyncer is started successfully.");
     }
   }
 
-  private void sync() {
+  private synchronized void sync() {
     final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 00f3b80424a..2522fdc66f6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -61,7 +61,7 @@ public class DoNothingConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     // do nothing
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index e252e5be726..82ddd05ba3c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -67,7 +67,7 @@ public class IoTDBThriftConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) {
+  public void transfer(Event event) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 6a18f9a64e8..bc56a8bb3cf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -46,19 +46,18 @@ public class DoNothingProcessor implements PipeProcessor {
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+    eventCollector.collect(tabletInsertionEvent);
   }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
       throws IOException {
-    eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+    eventCollector.collect(tsFileInsertionEvent);
   }
 
   @Override
-  public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
-      throws IOException {
-    eventCollector.collectDeletionEvent(deletionEvent);
+  public void process(Event event, EventCollector eventCollector) throws IOException {
+    eventCollector.collect(event);
   }
 
   @Override
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 5502de8bb37..6d74847e763 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.pipe.api;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -52,7 +52,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
  *             following 3 methods will be called: {@link
  *             PipeConnector#transfer(TabletInsertionEvent)}, {@link
  *             PipeConnector#transfer(TsFileInsertionEvent)} and {@link
- *             PipeConnector#transfer(DeletionEvent)}.
+ *             PipeConnector#transfer(Event)}.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
  *       PipeConnector#close() } method will be called.
@@ -130,11 +130,11 @@ public interface PipeConnector extends PipePlugin {
   void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
 
   /**
-   * This method is used to transfer the DeletionEvent.
+   * This method is used to transfer the Event.
    *
-   * @param deletionEvent DeletionEvent to be transferred
+   * @param event Event to be transferred
    * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
-  void transfer(DeletionEvent deletionEvent) throws Exception;
+  void transfer(Event event) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index 16a5e81ba6c..e94384d5f23 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
@@ -48,7 +48,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
  *             following 3 methods will be called: {@link
  *             PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
  *             PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
- *             PipeProcessor#process(DeletionEvent, EventCollector)}.
+ *             PipeProcessor#process(Event, EventCollector)}.
  *         <li>PipeConnector serializes the events into binaries and send them to sinks.
  *       </ul>
  *   <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
@@ -107,11 +107,11 @@ public interface PipeProcessor extends PipePlugin {
       throws Exception;
 
   /**
-   * This method is called to process the DeletionEvent.
+   * This method is called to process the Event.
    *
-   * @param deletionEvent DeletionEvent to be processed
+   * @param event Event to be processed
    * @param eventCollector used to collect result events after processing
    * @throws Exception the user can throw errors if necessary
    */
-  void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception;
+  void process(Event event, EventCollector eventCollector) throws Exception;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 2e53693d65b..a9ef1f0aa62 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -19,44 +19,21 @@
 
 package org.apache.iotdb.pipe.api.collector;
 
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 
 import java.io.IOException;
 
-/**
- * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent,
- * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link
- * PipeProcessor#process(DeletionEvent, EventCollector)}.
- */
+/** Used to collect events in pipe engine. */
 public interface EventCollector {
 
   /**
-   * Collects an insertion event in form of TabletInsertionEvent.
-   *
-   * @param event TabletInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TabletInsertionEvent
-   */
-  void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException;
-
-  /**
-   * Collects an insertion event in form of TsFileInsertionEvent.
-   *
-   * @param event TsFileInsertionEvent to be collected
-   * @throws IOException if any I/O errors occur
-   * @see TsFileInsertionEvent
-   */
-  void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException;
-
-  /**
-   * Collects a deletion event.
+   * Collects a Event in pipe engine.
    *
-   * @param event DeletionEvent to be collected
+   * @param event Event to be collected
    * @throws IOException if any I/O errors occur
-   * @see DeletionEvent
+   * @see Event
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent
+   * @see org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent
    */
-  void collectDeletionEvent(DeletionEvent event) throws IOException;
+  void collect(Event event) throws IOException;
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index f5d8d2fbcd3..74ddf9e47d2 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,8 +20,4 @@
 package org.apache.iotdb.pipe.api.event;
 
 /** This interface is used to abstract events in collaboration tasks. */
-public interface Event {
-
-  /** @return the type of the event */
-  EventType getType();
-}
+public interface Event {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
deleted file mode 100644
index d1fb966379c..00000000000
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.event.dml.deletion;
-
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/** DeletionEvent is used to define the event of deletion. */
-public interface DeletionEvent extends Event {
-
-  /**
-   * The method is used to get the path pattern of the deleted data.
-   *
-   * @return String
-   */
-  Path getPath();
-
-  /**
-   * The method is used to get the time range of the deleted data.
-   *
-   * @return TimeRange
-   */
-  TimeRange getTimeRange();
-
-  @Override
-  default EventType getType() {
-    return EventType.DELETION;
-  }
-}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index e353bf397e0..9fd6d89428c 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
@@ -54,9 +53,4 @@ public interface TabletInsertionEvent extends Event {
    *     RowCollector
    */
   TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer);
-
-  @Override
-  default EventType getType() {
-    return EventType.TABLET_INSERTION;
-  }
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 2d5badc8072..325f0683cb7 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.pipe.api.event.dml.insertion;
 
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 /**
  * TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
@@ -42,9 +41,4 @@ public interface TsFileInsertionEvent extends Event {
    * @return TsFileInsertionEvent
    */
   TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable);
-
-  @Override
-  default EventType getType() {
-    return EventType.TSFILE_INSERTION;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 67d461e8c8a..898b93cb22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
@@ -48,18 +50,17 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
   @Override
   public void collect(PipeRealtimeCollectEvent event) {
-    switch (event.getEvent().getType()) {
-      case TABLET_INSERTION:
-        collectTabletInsertion(event);
-        break;
-      case TSFILE_INSERTION:
-        collectTsFileInsertion(event);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Unsupported event type %s for Hybrid Realtime Collector %s",
-                event.getEvent().getType(), this));
+    final Event eventToCollect = event.getEvent();
+
+    if (eventToCollect instanceof TabletInsertionEvent) {
+      collectTabletInsertion(event);
+    } else if (eventToCollect instanceof TsFileInsertionEvent) {
+      collectTsFileInsertion(event);
+    } else {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unsupported event type %s for Hybrid Realtime Collector %s",
+              eventToCollect.getClass(), this));
     }
   }
 
@@ -107,18 +108,18 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
 
     while (collectEvent != null) {
       Event suppliedEvent;
-      switch (collectEvent.getEvent().getType()) {
-        case TABLET_INSERTION:
-          suppliedEvent = supplyTabletInsertion(collectEvent);
-          break;
-        case TSFILE_INSERTION:
-          suppliedEvent = supplyTsFileInsertion(collectEvent);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Unsupported event type %s for Hybrid Realtime Collector %s",
-                  collectEvent.getEvent().getType(), this));
+
+      // used to judge type of event, not directly for supplying.
+      final Event eventToSupply = collectEvent.getEvent();
+      if (eventToSupply instanceof TabletInsertionEvent) {
+        suppliedEvent = supplyTabletInsertion(collectEvent);
+      } else if (eventToSupply instanceof TsFileInsertionEvent) {
+        suppliedEvent = supplyTsFileInsertion(collectEvent);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported event type %s for Hybrid Realtime Collector %s",
+                eventToSupply.getClass(), this));
       }
 
       collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 32408769579..6b414c03b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -218,8 +218,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
   }
 
   @Override
-  public void transfer(DeletionEvent deletionEvent) throws Exception {
-    throw new NotImplementedException("Not implement for deletion event.");
+  public void transfer(Event event) {
+    LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 63fcc891361..0c63131e6e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
 
 import java.util.Map;
 
@@ -55,11 +54,6 @@ public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
     device2Measurements = null;
   }
 
-  @Override
-  public EventType getType() {
-    return event.getType();
-  }
-
   @Override
   public boolean increaseReferenceCount(String holderMessage) {
     return !(event instanceof EnrichedEvent)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index a1f443a9d43..c38b7c28a67 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -23,9 +23,6 @@ import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -48,21 +45,7 @@ public class PipeEventCollector implements EventCollector {
   }
 
   @Override
-  public void collectTabletInsertionEvent(TabletInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
-    collect(event);
-  }
-
-  @Override
-  public void collectDeletionEvent(DeletionEvent event) {
-    collect(event);
-  }
-
-  private synchronized void collect(Event event) {
+  public synchronized void collect(Event event) {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 762561546b3..d1befd3e08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -44,7 +45,8 @@ public abstract class PipeSubtaskExecutor {
 
   private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
 
-  private int corePoolSize;
+  private final int corePoolSize;
+  private int runningSubtaskNumber;
 
   protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
     subtaskWorkerThreadPoolExecutor =
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
     registeredIdSubtaskMapper = new ConcurrentHashMap<>();
 
     this.corePoolSize = corePoolSize;
+    runningSubtaskNumber = 0;
   }
 
   /////////////////////// subtask management ///////////////////////
@@ -65,7 +68,10 @@ public abstract class PipeSubtaskExecutor {
     }
 
     registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
-    subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
+    subtask.bindExecutors(
+        subtaskWorkerThreadPoolExecutor,
+        subtaskCallbackListeningExecutor,
+        new PipeSubtaskScheduler(this));
   }
 
   public final synchronized void start(String subTaskID) {
@@ -82,6 +88,7 @@ public abstract class PipeSubtaskExecutor {
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
+      ++runningSubtaskNumber;
       LOGGER.info("The subtask {} is started to submit self.", subTaskID);
     }
   }
@@ -92,7 +99,9 @@ public abstract class PipeSubtaskExecutor {
       return;
     }
 
-    registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
+    if (registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf()) {
+      --runningSubtaskNumber;
+    }
   }
 
   public final synchronized void deregister(String subTaskID) {
@@ -138,12 +147,11 @@ public abstract class PipeSubtaskExecutor {
     return subtaskWorkerThreadPoolExecutor.isShutdown();
   }
 
-  public final void adjustExecutorThreadNumber(int threadNum) {
-    corePoolSize = threadNum;
-    throw new UnsupportedOperationException("Not implemented yet.");
+  public final int getCorePoolSize() {
+    return corePoolSize;
   }
 
-  public final int getExecutorThreadNumber() {
-    return corePoolSize;
+  public final int getRunningSubtaskNumber() {
+    return runningSubtaskNumber;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 00000000000..dada354e743
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+
+public class PipeSubtaskScheduler {
+
+  private final PipeSubtaskExecutor executor;
+
+  private boolean isFirstSchedule = true;
+
+  // TODO: make these two configurable
+
+  private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 10_000;
+  private int consumedEventCountCheckpointInterval;
+  private int consumedEventCount;
+
+  // in ms
+  private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 1000L;
+  private long timeDurationCheckpointInterval;
+  private long lastCheckTime;
+
+  public PipeSubtaskScheduler(PipeSubtaskExecutor executor) {
+    this.executor = executor;
+  }
+
+  public boolean schedule() {
+    if (isFirstSchedule) {
+      isFirstSchedule = false;
+
+      adjustCheckpointIntervalBasedOnExecutorStatus();
+
+      ++consumedEventCount;
+      return true;
+    }
+
+    if (consumedEventCount < consumedEventCountCheckpointInterval
+        && System.currentTimeMillis() - lastCheckTime < timeDurationCheckpointInterval) {
+      ++consumedEventCount;
+      return true;
+    }
+
+    return false;
+  }
+
+  private void adjustCheckpointIntervalBasedOnExecutorStatus() {
+    // 1. reset consumedEventCount and lastCheckTime
+    consumedEventCount = 0;
+    lastCheckTime = System.currentTimeMillis();
+
+    // 2. adjust checkpoint interval
+    final int corePoolSize = Math.max(1, executor.getCorePoolSize());
+    final int runningSubtaskNumber = Math.max(1, executor.getRunningSubtaskNumber());
+    consumedEventCountCheckpointInterval =
+        Math.max(
+            1,
+            (int)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT / runningSubtaskNumber)
+                    * corePoolSize));
+    timeDurationCheckpointInterval =
+        Math.max(
+            1,
+            (long)
+                (((float) BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION / runningSubtaskNumber)
+                    * corePoolSize));
+  }
+
+  public void reset() {
+    isFirstSchedule = true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
deleted file mode 100644
index 4f035ca6715..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.execution.scheduler;
-
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-
-/**
- * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
- * PipeTaskExecutors dynamically.
- */
-public class PipeTaskScheduler {
-
-  private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
-      PipeSubtaskExecutorManager.getInstance();
-
-  public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getAssignerSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getConnectorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
-    // TODO: make it configurable by setting different parameters
-    pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
-  }
-
-  public int getProcessorSubtaskExecutorThreadNum() {
-    return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
-  }
-
-  /////////////////////////  Singleton Instance Holder  /////////////////////////
-
-  private PipeTaskScheduler() {}
-
-  private static class PipeTaskSchedulerHolder {
-    private static PipeTaskScheduler instance = null;
-  }
-
-  public static PipeTaskScheduler setupAndGetInstance() {
-    if (PipeTaskSchedulerHolder.instance == null) {
-      PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
-    }
-    return PipeTaskSchedulerHolder.instance;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 80f366eb096..742b2230fa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -50,9 +49,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
     this.outputPipeConnector = outputPipeConnector;
   }
 
-  // TODO: for a while
   @Override
-  protected synchronized void executeForAWhile() {
+  protected synchronized boolean executeOnce() {
     try {
       // TODO: reduce the frequency of heartbeat
       outputPipeConnector.heartbeat();
@@ -65,23 +63,16 @@ public class PipeConnectorSubtask extends PipeSubtask {
     // record this event for retrying on connection failure or other exceptions
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          outputPipeConnector.transfer((TabletInsertionEvent) event);
-          break;
-        case TSFILE_INSERTION:
-          outputPipeConnector.transfer((TsFileInsertionEvent) event);
-          break;
-        case DELETION:
-          outputPipeConnector.transfer((DeletionEvent) event);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+      if (event instanceof TabletInsertionEvent) {
+        outputPipeConnector.transfer((TabletInsertionEvent) event);
+      } else if (event instanceof TsFileInsertionEvent) {
+        outputPipeConnector.transfer((TsFileInsertionEvent) event);
+      } else {
+        outputPipeConnector.transfer(event);
       }
 
       releaseLastEvent();
@@ -93,6 +84,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
           "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index feb584fcaff..6a76beb02b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -51,28 +50,21 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected synchronized void executeForAWhile() throws Exception {
+  protected synchronized boolean executeOnce() throws Exception {
     final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
     // record the last event for retry when exception occurs
     lastEvent = event;
     if (event == null) {
-      return;
+      return false;
     }
 
     try {
-      switch (event.getType()) {
-        case TABLET_INSERTION:
-          pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
-          break;
-        case TSFILE_INSERTION:
-          pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
-          break;
-        case DELETION:
-          pipeProcessor.process((DeletionEvent) event, outputEventCollector);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported event type: " + event.getClass().getName());
+      if (event instanceof TabletInsertionEvent) {
+        pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
+      } else if (event instanceof TsFileInsertionEvent) {
+        pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
+      } else {
+        pipeProcessor.process(event, outputEventCollector);
       }
 
       releaseLastEvent();
@@ -82,6 +74,8 @@ public class PipeProcessorSubtask extends PipeSubtask {
           "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
           e);
     }
+
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index c770b983b9a..5ddff7b2f29 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,8 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
 
+  private PipeSubtaskScheduler subtaskScheduler;
+
   protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
   protected Throwable lastFailedCause;
@@ -60,14 +63,25 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   public void bindExecutors(
       ListeningExecutorService subtaskWorkerThreadPoolExecutor,
-      ExecutorService subtaskCallbackListeningExecutor) {
+      ExecutorService subtaskCallbackListeningExecutor,
+      PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
+    this.subtaskScheduler = subtaskScheduler;
   }
 
   @Override
   public Void call() throws Exception {
-    executeForAWhile();
+    // if the scheduler allows to schedule, then try to consume an event
+    while (subtaskScheduler.schedule()) {
+      // if the event is consumed successfully, then continue to consume the next event
+      // otherwise, stop consuming
+      if (!executeOnce()) {
+        break;
+      }
+    }
+    // reset the scheduler to make sure that the scheduler can schedule again
+    subtaskScheduler.reset();
 
     // wait for the callable to be decorated by Futures.addCallback in the executorService
     // to make sure that the callback can be submitted again on success or failure.
@@ -76,7 +90,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     return null;
   }
 
-  protected abstract void executeForAWhile() throws Exception;
+  /**
+   * try to consume an event by the pipe plugin.
+   *
+   * @return true if the event is consumed successfully, false if no more event can be consumed
+   * @throws Exception if any error occurs when consuming the event
+   */
+  protected abstract boolean executeOnce() throws Exception;
 
   @Override
   public void onSuccess(Void result) {
@@ -125,8 +145,12 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     shouldStopSubmittingSelf.set(false);
   }
 
-  public void disallowSubmittingSelf() {
-    shouldStopSubmittingSelf.set(true);
+  /**
+   * @return true if the shouldStopSubmittingSelf state is changed from false to true, false
+   *     otherwise
+   */
+  public boolean disallowSubmittingSelf() {
+    return !shouldStopSubmittingSelf.getAndSet(true);
   }
 
   public boolean isSubmittingSelf() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index a2c3508b294..7cd705af588 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDa
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 import org.junit.After;
@@ -163,9 +163,9 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum));
+              listen(collectors[1], event -> 1, writeNum));
 
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
@@ -199,14 +199,14 @@ public class PipeRealtimeCollectTest {
           Arrays.asList(
               listen(
                   collectors[0],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[1], typ2 -> 1, writeNum),
+              listen(collectors[1], event -> 1, writeNum),
               listen(
                   collectors[2],
-                  type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+                  event -> event instanceof TabletInsertionEvent ? 1 : 2,
                   writeNum << 1),
-              listen(collectors[3], typ2 -> 1, writeNum));
+              listen(collectors[3], event -> 1, writeNum));
       try {
         listenFutures.get(0).get(10, TimeUnit.MINUTES);
         listenFutures.get(1).get(10, TimeUnit.MINUTES);
@@ -278,9 +278,7 @@ public class PipeRealtimeCollectTest {
   }
 
   private Future<?> listen(
-      PipeRealtimeDataRegionCollector collector,
-      Function<EventType, Integer> weight,
-      int expectNum) {
+      PipeRealtimeDataRegionCollector collector, Function<Event, Integer> weight, int expectNum) {
     return listenerService.submit(
         () -> {
           int eventNum = 0;
@@ -293,7 +291,7 @@ public class PipeRealtimeCollectTest {
                 throw new RuntimeException(e);
               }
               if (event != null) {
-                eventNum += weight.apply(event.getType());
+                eventNum += weight.apply(event);
               }
             }
           } finally {