You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/29 17:02:18 UTC

[iotdb] branch master updated: [IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bd1222d180e [IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)
bd1222d180e is described below

commit bd1222d180ed8bc539ebdd380c92e12b17bf12b9
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue May 30 01:02:12 2023 +0800

    [IOTDB-5920] Pipe collector modes: TsFile only, log only, realtime only, historical only (#9942)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../api/customizer/PipeParameterValidator.java     |  27 ++++
 .../db/pipe/config/PipeCollectorConstant.java      |  10 ++
 .../core/collector/IoTDBDataRegionCollector.java   | 147 ++++++++++++++++++---
 .../PipeHistoricalDataRegionCollector.java}        |  15 +--
 .../PipeHistoricalDataRegionFakeCollector.java     |  56 ++++++++
 .../PipeHistoricalDataRegionTsFileCollector.java   |  42 +++++-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  12 +-
 .../PipeRealtimeDataRegionFakeCollector.java       |  70 ++++++++++
 .../PipeRealtimeDataRegionHybridCollector.java     |  14 +-
 .../PipeRealtimeDataRegionLogCollector.java        | 117 ++++++++++++++++
 .../PipeRealtimeDataRegionTsFileCollector.java     | 118 +++++++++++++++++
 .../listener/PipeInsertionDataNodeListener.java    |  31 ++++-
 .../collector/CachedSchemaPatternMatcherTest.java  |  13 +-
 .../core/collector/PipeRealtimeCollectTest.java    |   2 +
 14 files changed, 628 insertions(+), 46 deletions(-)

diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
index fa0632104b7..2ba9fffa612 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/PipeParameterValidator.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.pipe.api.customizer;
 import org.apache.iotdb.pipe.api.exception.PipeAttributeNotProvidedException;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
+import java.util.Arrays;
+
 public class PipeParameterValidator {
 
   private final PipeParameters parameters;
@@ -49,6 +51,31 @@ public class PipeParameterValidator {
     return this;
   }
 
+  public PipeParameterValidator validateAttributeValueRange(
+      String key, boolean canBeOptional, String... optionalValues)
+      throws PipeAttributeNotProvidedException {
+    if (!parameters.hasAttribute(key)) {
+      if (!canBeOptional) {
+        throw new PipeAttributeNotProvidedException(String.format("%s should be set.", key));
+      }
+      return this;
+    }
+
+    final String actualValue = parameters.getString(key);
+    for (String optionalValue : optionalValues) {
+      if (actualValue.equals(optionalValue)) {
+        return this;
+      }
+    }
+
+    if (canBeOptional) {
+      return this;
+    }
+
+    throw new PipeAttributeNotProvidedException(
+        String.format("%s should be one of %s", key, Arrays.toString(optionalValues)));
+  }
+
   /**
    * Validates the input parameters according to the validation rule given by the user.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index 06485b8cc2e..105d45f8484 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -28,6 +28,16 @@ public class PipeCollectorConstant {
 
   public static final String DATA_REGION_KEY = "collector.data-region";
 
+  public static final String COLLECTOR_HISTORY_ENABLE_KEY = "collector.history.enable";
+  public static final String COLLECTOR_HISTORY_START_TIME = "collector.history.start-time";
+  public static final String COLLECTOR_HISTORY_END_TIME = "collector.history.end-time";
+
+  public static final String COLLECTOR_REALTIME_ENABLE = "collector.realtime.enable";
+  public static final String COLLECTOR_REALTIME_MODE = "collector.realtime.mode";
+  public static final String COLLECTOR_REALTIME_MODE_HYBRID = "hybrid";
+  public static final String COLLECTOR_REALTIME_MODE_FILE = "file";
+  public static final String COLLECTOR_REALTIME_MODE_LOG = "log";
+
   private PipeCollectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index db5ece66fd7..b26c880b028 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -23,25 +23,47 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionFakeCollector;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionFakeCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionLogCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_ENABLE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_FILE;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_HYBRID;
+import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_REALTIME_MODE_LOG;
 
 public class IoTDBDataRegionCollector implements PipeCollector {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionCollector.class);
+
   private final AtomicBoolean hasBeenStarted;
 
-  private final PipeRealtimeDataRegionCollector realtimeCollector;
+  private final ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue;
+  private final PipeTaskMeta pipeTaskMeta;
+
   // TODO: support pattern in historical collector
-  private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
+  private PipeHistoricalDataRegionCollector historicalCollector;
+  private PipeRealtimeDataRegionCollector realtimeCollector;
 
   private int dataRegionId;
 
@@ -49,27 +71,102 @@ public class IoTDBDataRegionCollector implements PipeCollector {
       PipeTaskMeta pipeTaskMeta,
       ListenableUnboundedBlockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
+
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.collectorPendingQueue = collectorPendingQueue;
+
+    historicalCollector = new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
     realtimeCollector =
         new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
-    historicalCollector = new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
   }
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
 
-    // TODO: require more attributes
-    realtimeCollector.validate(validator);
+    // validate collector.history.enable and collector.realtime.enable
+    validator
+        .validateAttributeValueRange(
+            COLLECTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
+        .validateAttributeValueRange(
+            COLLECTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
+        .validate(
+            args -> (boolean) args[0] || (boolean) args[1],
+            String.format(
+                "Should not set both %s and %s to false.",
+                COLLECTOR_HISTORY_ENABLE_KEY, COLLECTOR_REALTIME_ENABLE),
+            validator.getParameters().getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true),
+            validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true));
+
+    // validate collector.realtime.mode
+    if (validator.getParameters().getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
+      validator.validateAttributeValueRange(
+          COLLECTOR_REALTIME_MODE,
+          true,
+          COLLECTOR_REALTIME_MODE_HYBRID,
+          COLLECTOR_REALTIME_MODE_FILE,
+          COLLECTOR_REALTIME_MODE_LOG);
+    }
+
+    constructHistoricalCollector(validator.getParameters());
+    constructRealtimeCollector(validator.getParameters());
+
     historicalCollector.validate(validator);
+    realtimeCollector.validate(validator);
+  }
+
+  private void constructHistoricalCollector(PipeParameters parameters) {
+    // enable historical collector by default
+    historicalCollector =
+        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true)
+            ? new PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta)
+            : new PipeHistoricalDataRegionFakeCollector();
+  }
+
+  private void constructRealtimeCollector(PipeParameters parameters) {
+    // enable realtime collector by default
+    if (!parameters.getBooleanOrDefault(COLLECTOR_REALTIME_ENABLE, true)) {
+      realtimeCollector = new PipeRealtimeDataRegionFakeCollector(pipeTaskMeta);
+      return;
+    }
+
+    // use hybrid mode by default
+    if (!parameters.hasAttribute(COLLECTOR_REALTIME_MODE)) {
+      realtimeCollector =
+          new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+      return;
+    }
+
+    switch (parameters.getString(COLLECTOR_REALTIME_MODE)) {
+      case COLLECTOR_REALTIME_MODE_FILE:
+        realtimeCollector =
+            new PipeRealtimeDataRegionTsFileCollector(pipeTaskMeta, collectorPendingQueue);
+        break;
+      case COLLECTOR_REALTIME_MODE_LOG:
+        realtimeCollector =
+            new PipeRealtimeDataRegionLogCollector(pipeTaskMeta, collectorPendingQueue);
+        break;
+      case COLLECTOR_REALTIME_MODE_HYBRID:
+        realtimeCollector =
+            new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+        break;
+      default:
+        realtimeCollector =
+            new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, collectorPendingQueue);
+        LOGGER.warn(
+            String.format(
+                "Unsupported collector realtime mode: %s, create a hybrid collector.",
+                parameters.getString(COLLECTOR_REALTIME_MODE)));
+    }
   }
 
   @Override
-  public void customize(
-      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+  public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+      throws Exception {
     dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
 
-    realtimeCollector.customize(parameters, configuration);
     historicalCollector.customize(parameters, configuration);
+    realtimeCollector.customize(parameters, configuration);
   }
 
   @Override
@@ -79,6 +176,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
     }
     hasBeenStarted.set(true);
 
+    final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
+    final DataRegionId dataRegionId = new DataRegionId(this.dataRegionId);
     while (true) {
       // try to start collectors in the data region ...
       // first try to run if data region exists, then try to run if data region does not exist.
@@ -86,29 +185,47 @@ public class IoTDBDataRegionCollector implements PipeCollector {
       // runIfPresent and runIfAbsent operations. in this case, we need to retry.
       if (StorageEngine.getInstance()
               .runIfPresent(
-                  new DataRegionId(dataRegionId),
+                  dataRegionId,
                   (dataRegion -> {
                     dataRegion.writeLock(
                         String.format(
                             "Pipe: starting %s", IoTDBDataRegionCollector.class.getName()));
                     try {
-                      startHistoricalCollectorAndRealtimeCollector();
+                      startHistoricalCollectorAndRealtimeCollector(exceptionHolder);
                     } finally {
                       dataRegion.writeUnlock();
                     }
                   }))
           || StorageEngine.getInstance()
               .runIfAbsent(
-                  new DataRegionId(dataRegionId),
-                  this::startHistoricalCollectorAndRealtimeCollector)) {
+                  dataRegionId,
+                  () -> startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) {
+        rethrowExceptionIfAny(exceptionHolder);
         return;
       }
+      rethrowExceptionIfAny(exceptionHolder);
     }
   }
 
-  public void startHistoricalCollectorAndRealtimeCollector() {
-    historicalCollector.start();
-    realtimeCollector.start();
+  private void startHistoricalCollectorAndRealtimeCollector(
+      AtomicReference<Exception> exceptionHolder) {
+    try {
+      historicalCollector.start();
+      realtimeCollector.start();
+    } catch (Exception e) {
+      exceptionHolder.set(e);
+      LOGGER.warn(
+          String.format(
+              "Start historical collector %s and realtime collector %s error.",
+              historicalCollector, realtimeCollector),
+          e);
+    }
+  }
+
+  private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
+    if (exceptionHolder.get() != null) {
+      throw new PipeManagementException("failed to start collectors.", exceptionHolder.get());
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
index 06485b8cc2e..b5eb71c8df0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionCollector.java
@@ -17,18 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.core.collector.historical;
 
-public class PipeCollectorConstant {
+import org.apache.iotdb.pipe.api.PipeCollector;
 
-  public static final String COLLECTOR_KEY = "collector";
+public abstract class PipeHistoricalDataRegionCollector implements PipeCollector {
 
-  public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
-  public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
-
-  public static final String DATA_REGION_KEY = "collector.data-region";
-
-  private PipeCollectorConstant() {
-    throw new IllegalStateException("Utility class");
-  }
+  public abstract boolean hasConsumedAll();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java
new file mode 100644
index 00000000000..fb61a9b4cc1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionFakeCollector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.historical;
+
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeHistoricalDataRegionFakeCollector extends PipeHistoricalDataRegionCollector {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {}
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {}
+
+  @Override
+  public void start() {}
+
+  @Override
+  public Event supply() {
+    return null;
+  }
+
+  @Override
+  public boolean hasConsumedAll() {
+    return true;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String toString() {
+    return "PipeHistoricalDataRegionFakeCollector{}";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 9e796bddc77..90041ed1d59 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -25,25 +25,30 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
-public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
+public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataRegionCollector {
 
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
   private int dataRegionId;
 
+  private long collectStartTime;
+  private long collectEndTime;
+
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
   public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
@@ -60,6 +65,18 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
     dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+    collectStartTime =
+        parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME)
+            ? DateTimeUtils.convertDatetimeStrToLong(
+                parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME),
+                ZoneId.systemDefault())
+            : Long.MIN_VALUE;
+    collectEndTime =
+        parameters.hasAttribute(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME)
+            ? DateTimeUtils.convertDatetimeStrToLong(
+                parameters.getString(PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME),
+                ZoneId.systemDefault())
+            : Long.MAX_VALUE;
   }
 
   @Override
@@ -81,18 +98,24 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(true).stream()
-                .filter(resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+                .filter(
+                    resource ->
+                        !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                            && isTsFileResourceOverlappedWithTimeRange(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
-                .filter(resource -> !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()))
+                .filter(
+                    resource ->
+                        !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                            && isTsFileResourceOverlappedWithTimeRange(resource))
                 .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
-            event -> {
-              event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
-            });
+            event ->
+                event.increaseReferenceCount(
+                    PipeHistoricalDataRegionTsFileCollector.class.getName()));
       } finally {
         tsFileManager.readUnlock();
       }
@@ -101,6 +124,11 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
     }
   }
 
+  private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) {
+    return !(resource.getFileEndTime() < collectStartTime
+        || collectEndTime < resource.getFileStartTime());
+  }
+
   @Override
   public Event supply() {
     if (pendingQueue == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index fd790b718cd..46d81b05ed0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -45,8 +45,8 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
   }
 
   @Override
-  public void customize(
-      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+  public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
+      throws Exception {
     pattern =
         parameters.getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
@@ -55,18 +55,22 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
   }
 
   @Override
-  public void start() {
+  public void start() throws Exception {
     PipeInsertionDataNodeListener.getInstance().startListenAndAssign(dataRegionId, this);
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
   }
 
   /** @param event the event from the storage engine */
   public abstract void collect(PipeRealtimeCollectEvent event);
 
+  public abstract boolean isNeedListenToTsFile();
+
+  public abstract boolean isNeedListenToInsertNode();
+
   public final String getPattern() {
     return pattern;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
new file mode 100644
index 00000000000..a559472f290
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeRealtimeDataRegionFakeCollector extends PipeRealtimeDataRegionCollector {
+
+  public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+    super(pipeTaskMeta);
+  }
+
+  @Override
+  public void validate(PipeParameterValidator validator) {}
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {}
+
+  @Override
+  public void start() {}
+
+  @Override
+  public Event supply() {
+    return null;
+  }
+
+  @Override
+  public void collect(PipeRealtimeCollectEvent event) {}
+
+  @Override
+  public boolean isNeedListenToTsFile() {
+    return false;
+  }
+
+  @Override
+  public boolean isNeedListenToInsertNode() {
+    return false;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String toString() {
+    return "PipeRealtimeDataRegionFakeCollector{}";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 302594b1d02..abb3a109c2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -66,6 +66,16 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
     }
   }
 
+  @Override
+  public boolean isNeedListenToTsFile() {
+    return true;
+  }
+
+  @Override
+  public boolean isNeedListenToInsertNode() {
+    return true;
+  }
+
   private void collectTabletInsertion(PipeRealtimeCollectEvent event) {
     if (isApproachingCapacity()) {
       event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
@@ -120,7 +130,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
       } else {
         throw new UnsupportedOperationException(
             String.format(
-                "Unsupported event type %s for Hybrid Realtime Collector %s",
+                "Unsupported event type %s for Hybrid Realtime Collector %s to supply.",
                 eventToSupply.getClass(), this));
       }
 
@@ -195,7 +205,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
   }
 
   @Override
-  public void close() {
+  public void close() throws Exception {
     super.close();
     pendingQueue.clear();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
new file mode 100644
index 00000000000..2d5150bcd80
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeRealtimeDataRegionLogCollector extends PipeRealtimeDataRegionCollector {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeRealtimeDataRegionLogCollector.class);
+
+  // TODO: memory control
+  // This queue is used to store pending events collected by the method collect(). The method
+  // supply() will poll events from this queue and send them to the next pipe plugin.
+  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+
+  public PipeRealtimeDataRegionLogCollector(
+      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+    super(pipeTaskMeta);
+    this.pendingQueue = pendingQueue;
+  }
+
+  @Override
+  public void collect(PipeRealtimeCollectEvent event) {
+    event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
+
+    if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+      return;
+    }
+
+    if (!pendingQueue.offer(event)) {
+      LOGGER.warn(
+          String.format(
+              "Pending Queue of Log Realtime Collector %s has reached capacity, discard Tablet Event %s, current state %s",
+              this, event, event.getTsFileEpoch().getState(this)));
+      // this would not happen, but just in case.
+      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // TODO: memory control when elements in queue are too many.
+    }
+  }
+
+  @Override
+  public boolean isNeedListenToTsFile() {
+    return false;
+  }
+
+  @Override
+  public boolean isNeedListenToInsertNode() {
+    return true;
+  }
+
+  @Override
+  public Event supply() {
+    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+
+    while (collectEvent != null) {
+      Event suppliedEvent = null;
+
+      if (collectEvent.increaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName())) {
+        suppliedEvent = collectEvent.getEvent();
+      } else {
+        // if the event's reference count can not be increased, it means the data represented by
+        // this event is not reliable anymore. the data has been lost. we simply discard this event
+        // and report the exception to PipeRuntimeAgent.
+        final String errorMessage =
+            String.format(
+                "Tablet Event %s can not be supplied because the reference count can not be increased, "
+                    + "the data represented by this event is lost",
+                collectEvent.getEvent());
+        PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+      }
+
+      collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogCollector.class.getName());
+      if (suppliedEvent != null) {
+        return suppliedEvent;
+      }
+
+      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+    }
+
+    // means the pending queue is empty.
+    return null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    pendingQueue.clear();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
new file mode 100644
index 00000000000..4b2849f0243
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.collector.realtime;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
+import org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeRealtimeDataRegionTsFileCollector extends PipeRealtimeDataRegionCollector {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileCollector.class);
+
+  // TODO: memory control
+  // This queue is used to store pending events collected by the method collect(). The method
+  // supply() will poll events from this queue and send them to the next pipe plugin.
+  private final ListenableUnboundedBlockingPendingQueue<Event> pendingQueue;
+
+  public PipeRealtimeDataRegionTsFileCollector(
+      PipeTaskMeta pipeTaskMeta, ListenableUnboundedBlockingPendingQueue<Event> pendingQueue) {
+    super(pipeTaskMeta);
+    this.pendingQueue = pendingQueue;
+  }
+
+  @Override
+  public void collect(PipeRealtimeCollectEvent event) {
+    event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
+
+    if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+      return;
+    }
+
+    if (!pendingQueue.offer(event)) {
+      LOGGER.warn(
+          String.format(
+              "Pending Queue of TsFile Realtime Collector %s has reached capacity, discard TsFile Event %s, current state %s",
+              this, event, event.getTsFileEpoch().getState(this)));
+      // this would not happen, but just in case.
+      // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+      // TODO: memory control when elements in queue are too many.
+    }
+  }
+
+  @Override
+  public boolean isNeedListenToTsFile() {
+    return true;
+  }
+
+  @Override
+  public boolean isNeedListenToInsertNode() {
+    return false;
+  }
+
+  @Override
+  public Event supply() {
+    PipeRealtimeCollectEvent collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+
+    while (collectEvent != null) {
+      Event suppliedEvent = null;
+
+      if (collectEvent.increaseReferenceCount(
+          PipeRealtimeDataRegionTsFileCollector.class.getName())) {
+        suppliedEvent = collectEvent.getEvent();
+      } else {
+        // if the event's reference count can not be increased, it means the data represented by
+        // this event is not reliable anymore. the data has been lost. we simply discard this event
+        // and report the exception to PipeRuntimeAgent.
+        final String errorMessage =
+            String.format(
+                "TsFile Event %s can not be supplied because the reference count can not be increased, "
+                    + "the data represented by this event is lost",
+                collectEvent.getEvent());
+        PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+      }
+
+      collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileCollector.class.getName());
+      if (suppliedEvent != null) {
+        return suppliedEvent;
+      }
+
+      collectEvent = (PipeRealtimeCollectEvent) pendingQueue.poll();
+    }
+
+    // means the pending queue is empty.
+    return null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    pendingQueue.clear();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index 68b2c5ca010..ae9cce9a498 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * PipeInsertionEventListener is a singleton in each data node.
@@ -46,6 +47,9 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
+  private final AtomicInteger listenToTsFileCollectorCount = new AtomicInteger(0);
+  private final AtomicInteger listenToInsertNodeCollectorCount = new AtomicInteger(0);
+
   //////////////////////////// start & stop ////////////////////////////
 
   public synchronized void startListenAndAssign(
@@ -53,6 +57,13 @@ public class PipeInsertionDataNodeListener {
     dataRegionId2Assigner
         .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
         .startAssignTo(collector);
+
+    if (collector.isNeedListenToTsFile()) {
+      listenToTsFileCollectorCount.incrementAndGet();
+    }
+    if (collector.isNeedListenToInsertNode()) {
+      listenToInsertNodeCollectorCount.incrementAndGet();
+    }
   }
 
   public synchronized void stopListenAndAssign(
@@ -64,6 +75,13 @@ public class PipeInsertionDataNodeListener {
 
     assigner.stopAssignTo(collector);
 
+    if (collector.isNeedListenToTsFile()) {
+      listenToTsFileCollectorCount.decrementAndGet();
+    }
+    if (collector.isNeedListenToInsertNode()) {
+      listenToInsertNodeCollectorCount.decrementAndGet();
+    }
+
     if (assigner.notMoreCollectorNeededToBeAssigned()) {
       // the removed assigner will is the same as the one referenced by the variable `assigner`
       dataRegionId2Assigner.remove(dataRegionId);
@@ -74,12 +92,11 @@ public class PipeInsertionDataNodeListener {
 
   //////////////////////////// listen to events ////////////////////////////
 
-  // TODO: listen to the tsfile synced from the other cluster
-  // TODO: check whether the method is called on the right place. what is the meaning of the
-  // variable shouldClose before calling this method?
-  // TODO: maximum the efficiency of the method when there is no pipe in the system, avoid
-  // dataRegionId2Assigner.get(dataRegionId);
   public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) {
+    // wo don't judge whether listenToTsFileCollectorCount.get() == 0 here, because
+    // when using SimpleProgressIndex, the tsfile event needs to be assigned to the
+    // collector even if listenToTsFileCollectorCount.get() == 0 to record the progress
+
     PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
 
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
@@ -97,6 +114,10 @@ public class PipeInsertionDataNodeListener {
       WALEntryHandler walEntryHandler,
       InsertNode insertNode,
       TsFileResource tsFileResource) {
+    if (listenToInsertNodeCollectorCount.get() == 0) {
+      return;
+    }
+
     final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 198761eee3b..6deaed9343d 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -38,7 +38,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -64,7 +63,7 @@ public class CachedSchemaPatternMatcherTest {
   }
 
   @Test
-  public void testCachedMatcher() throws ExecutionException, InterruptedException {
+  public void testCachedMatcher() throws Exception {
     PipeRealtimeDataRegionCollector databaseCollector =
         new PipeRealtimeDataRegionFakeCollector(null);
     databaseCollector.customize(
@@ -181,5 +180,15 @@ public class CachedSchemaPatternMatcherTest {
               });
       Assert.assertTrue(match[0]);
     }
+
+    @Override
+    public boolean isNeedListenToTsFile() {
+      return true;
+    }
+
+    @Override
+    public boolean isNeedListenToInsertNode() {
+      return true;
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 02ef1150a84..27d28114007 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -232,6 +232,8 @@ public class PipeRealtimeCollectTest {
               throw new RuntimeException(e);
             }
           });
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }