You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 06:31:55 UTC

[iotdb] branch master updated: [IOTDB-5870] Pipe: Event reference management (#9836)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new af2d050b54b [IOTDB-5870] Pipe: Event reference management (#9836)
af2d050b54b is described below

commit af2d050b54b0e38e76c9802ed46133a191f74a89
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 14 14:31:49 2023 +0800

    [IOTDB-5870] Pipe: Event reference management (#9836)
    
    * Create event's reference count interface: EnrichedEvent
    
    * Add reference count increasement and decreasement in PipeDataRegionAssginer, PipeRealtimeDataRegionHybridCollector, PipeEventCollector, PipeProcessorSubtask, PipeConnectorSubtask
    
    ---------
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |  1 +
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  5 ++
 .../PipeRealtimeDataRegionHybridCollector.java     | 31 ++++++++++++-
 .../realtime/assigner/PipeDataRegionAssigner.java  | 20 ++++++--
 .../matcher/CachedSchemaPatternMatcher.java        |  6 +--
 .../realtime/matcher/PipeDataRegionMatcher.java    |  9 +++-
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    | 54 ++++++++++++++++++++++
 .../core/event/impl/PipeTabletInsertionEvent.java  | 27 ++++++++++-
 .../core/event/impl/PipeTsFileInsertionEvent.java  | 48 ++++++++++++++++++-
 .../event/realtime/PipeRealtimeCollectEvent.java   | 20 +++++++-
 .../realtime/PipeRealtimeCollectEventFactory.java  |  2 -
 .../event/view/collector/PipeEventCollector.java   |  6 +++
 .../iotdb/db/pipe/resource/PipeFileManager.java    | 22 ---------
 ...ceManager.java => PipeFileResourceManager.java} |  4 +-
 .../iotdb/db/pipe/resource/PipeRaftlogHolder.java  | 22 ---------
 .../db/pipe/resource/PipeResourceManager.java      |  8 ++--
 .../iotdb/db/pipe/resource/PipeWALHolder.java      | 22 ---------
 .../db/pipe/task/subtask/PipeConnectorSubtask.java | 26 +++++++----
 .../db/pipe/task/subtask/PipeProcessorSubtask.java | 32 +++++++------
 .../collector/CachedSchemaPatternMatcherTest.java  |  4 +-
 .../core/collector/PipeRealtimeCollectTest.java    | 45 ++++++++++++++----
 ...rTest.java => PipeFileResourceManagerTest.java} | 54 +++++++++++-----------
 22 files changed, 317 insertions(+), 151 deletions(-)

diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
index db08728faff..724c1595571 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.pipe.api.event.Event;
  *       cancelled (the `DROP PIPE` command is executed).
  * </ul>
  */
+// TODO: support event lifecycle management
 public interface PipeCollector extends PipePlugin {
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 8c03766e459..1160ff4a845 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,4 +36,8 @@ public class PipeRuntimeAgent {
         subtask.getTaskID(),
         subtask.getLastFailedCause());
   }
+
+  public void report(PipeRuntimeException pipeRuntimeException) {
+    // TODO: complete this method
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index d8b79fbfcff..ab7b705ce87 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,6 +121,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
                   "Unsupported event type %s for Hybrid Realtime Collector %s",
                   collectEvent.getEvent().getType(), this));
       }
+
+      collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
@@ -138,7 +144,15 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
                 (state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state);
 
     if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
-      return event.getEvent();
+      if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+        return event.getEvent();
+      } else {
+        // if the event's reference count can not be increased, it means the data represented by
+        // this event is not reliable anymore. but the data represented by this event
+        // has been carried by the following tsfile event, so we can just discard this event.
+        event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
+        return null;
+      }
     }
     // if the state is USING_TSFILE, discard the event and poll the next one.
     return null;
@@ -160,7 +174,20 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
             });
 
     if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      return event.getEvent();
+      if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+        return event.getEvent();
+      } else {
+        // if the event's reference count can not be increased, it means the data represented by
+        // this event is not reliable anymore. the data has been lost. we simply discard this event
+        // and report the exception to PipeRuntimeAgent.
+        final String errorMessage =
+            String.format(
+                "TsFile Event %s can not be supplied because the reference count can not be increased, "
+                    + "the data represented by this event is lost",
+                event.getEvent());
+        PipeAgent.runtime().report(new PipeRuntimeNonCriticalException(errorMessage));
+        return null;
+      }
     }
     // if the state is USING_TABLET, discard the event and poll the next one.
     return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 38ee9257bef..4fc03f371cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
 
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -39,18 +40,27 @@ public class PipeDataRegionAssigner {
     this.disruptor =
         new DisruptorQueue.Builder<PipeRealtimeCollectEvent>()
             .setProducerType(ProducerType.SINGLE)
-            .addEventHandler(
-                (event, sequence, endOfBatch) -> {
-                  matcher.match(event);
-                  event.gcSchemaInfo();
-                })
+            .addEventHandler(this::assignToCollector)
             .build();
   }
 
   public void publishToAssign(PipeRealtimeCollectEvent event) {
+    event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
     disruptor.publish(event);
   }
 
+  public void assignToCollector(PipeRealtimeCollectEvent event, long sequence, boolean endOfBatch) {
+    matcher
+        .match(event)
+        .forEach(
+            collector -> {
+              event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+              collector.collect(event);
+            });
+    event.gcSchemaInfo();
+    event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
+  }
+
   public void startAssignTo(PipeRealtimeDataRegionCollector collector) {
     matcher.register(collector);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index 0589cf09d7a..c8429331730 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -85,13 +85,13 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
   // TODO: maximum the efficiency of matching when pattern is root
   // TODO: memory control
   @Override
-  public void match(PipeRealtimeCollectEvent event) {
+  public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event) {
     final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new HashSet<>();
 
     lock.readLock().lock();
     try {
       if (collectors.isEmpty()) {
-        return;
+        return matchedCollectors;
       }
 
       for (final Map.Entry<String, String[]> entry : event.getSchemaInfo().entrySet()) {
@@ -163,7 +163,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
       lock.readLock().unlock();
     }
 
-    matchedCollectors.forEach(collector -> collector.collect(event));
+    return matchedCollectors;
   }
 
   private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String device) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
index dcdbffa71eb..6411d7cead1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 
+import java.util.Set;
+
 public interface PipeDataRegionMatcher {
 
   /**
@@ -38,9 +40,12 @@ public interface PipeDataRegionMatcher {
 
   /**
    * Match the event's schema info with the registered collectors' patterns. If the event's schema
-   * info matches the pattern of a collector, the event will be assigned to the collector.
+   * info matches the pattern of a collector, the collector will be returned.
+   *
+   * @param event the event to be matched
+   * @return the matched collectors
    */
-  void match(PipeRealtimeCollectEvent event);
+  Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event);
 
   /** Clear all the registered collectors and internal data structures. */
   void clear();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
new file mode 100644
index 00000000000..fa9e765ca25
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+/**
+ * EnrichedEvent is an event that can be enriched with additional runtime information. The
+ * additional information mainly includes the reference count of the event.
+ */
+public interface EnrichedEvent {
+
+  /**
+   * Increase the reference count of this event.
+   *
+   * @param holderMessage the message of the invoker
+   * @return true if the reference count is increased successfully, false if the event is not
+   *     controlled by the invoker, which means the data stored in the event is not safe to use
+   */
+  boolean increaseReferenceCount(String holderMessage);
+
+  /**
+   * Decrease the reference count of this event. If the reference count is decreased to 0, the event
+   * can be recycled and the data stored in the event is not safe to use.
+   *
+   * @param holderMessage the message of the invoker
+   * @return true if the reference count is decreased successfully, false otherwise
+   */
+  boolean decreaseReferenceCount(String holderMessage);
+
+  /**
+   * Get the reference count of this event.
+   *
+   * @return the reference count
+   */
+  int getReferenceCount();
+
+  // TODO: ConsensusIndex getConsensusIndex();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 3afeabf9ea9..e97e0da9b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -20,20 +20,25 @@
 package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
 
   private final InsertNode insertNode;
 
+  private final AtomicInteger referenceCount;
+
   public PipeTabletInsertionEvent(InsertNode insertNode) {
     this.insertNode = insertNode;
+    this.referenceCount = new AtomicInteger(0);
   }
 
   @Override
@@ -51,6 +56,26 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent {
     throw new UnsupportedOperationException("Not implemented yet");
   }
 
+  @Override
+  public boolean increaseReferenceCount(String holderMessage) {
+    // TODO: use WALPipeHandler pinMemtable
+    referenceCount.incrementAndGet();
+    return true;
+  }
+
+  @Override
+  public boolean decreaseReferenceCount(String holderMessage) {
+    // TODO: use WALPipeHandler unpinMemetable
+    referenceCount.decrementAndGet();
+    return true;
+  }
+
+  @Override
+  public int getReferenceCount() {
+    // TODO: use WALPipeHandler unpinMemetable
+    return referenceCount.get();
+  }
+
   @Override
   public String toString() {
     return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 69d527e34ba..1a460b6b0a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,13 +19,21 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
-  private final File tsFile;
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, EnrichedEvent {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
+
+  private File tsFile;
 
   public PipeTsFileInsertionEvent(File tsFile) {
     this.tsFile = tsFile;
@@ -41,6 +49,42 @@ public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
     throw new UnsupportedOperationException("Not implemented yet");
   }
 
+  @Override
+  public boolean increaseReferenceCount(String holderMessage) {
+    try {
+      // TODO: increase reference count for mods & resource files
+      tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Increase reference count for TsFile %s error. Holder Message: %s",
+              tsFile.getPath(), holderMessage),
+          e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean decreaseReferenceCount(String holderMessage) {
+    try {
+      PipeResourceManager.file().decreaseFileReference(tsFile);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn(
+          String.format(
+              "Decrease reference count for TsFile %s error. Holder Message: %s",
+              tsFile.getPath(), holderMessage),
+          e);
+      return false;
+    }
+  }
+
+  @Override
+  public int getReferenceCount() {
+    return PipeResourceManager.file().getFileReferenceCount(tsFile);
+  }
+
   @Override
   public String toString() {
     return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 4041e74918e..63fcc891361 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.pipe.core.event.realtime;
 
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.EventType;
 
 import java.util.Map;
 
-public class PipeRealtimeCollectEvent implements Event {
+public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
 
   private final Event event;
   private final TsFileEpoch tsFileEpoch;
@@ -59,6 +60,23 @@ public class PipeRealtimeCollectEvent implements Event {
     return event.getType();
   }
 
+  @Override
+  public boolean increaseReferenceCount(String holderMessage) {
+    return !(event instanceof EnrichedEvent)
+        || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+  }
+
+  @Override
+  public boolean decreaseReferenceCount(String holderMessage) {
+    return !(event instanceof EnrichedEvent)
+        || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+  }
+
+  @Override
+  public int getReferenceCount() {
+    return event instanceof EnrichedEvent ? ((EnrichedEvent) event).getReferenceCount() : 0;
+  }
+
   @Override
   public String toString() {
     return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" + tsFileEpoch + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 2bb1b3174f3..29cbefa8d14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -30,13 +30,11 @@ public class PipeRealtimeCollectEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager();
 
-  // TODO: resource control here?
   public static PipeRealtimeCollectEvent createCollectEvent(File tsFile, TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
         new PipeTsFileInsertionEvent(tsFile), resource);
   }
 
-  // TODO: resource control here?
   public static PipeRealtimeCollectEvent createCollectEvent(
       InsertNode node, TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 0d1d60fdde7..8bde5dda46c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -62,6 +64,10 @@ public class PipeEventCollector implements EventCollector {
   }
 
   private synchronized void collect(Event event) {
+    if (event instanceof EnrichedEvent) {
+      ((EnrichedEvent) event).increaseReferenceCount(PipeConnectorSubtask.class.getName());
+    }
+
     while (!bufferQueue.isEmpty()) {
       final Event bufferedEvent = bufferQueue.peek();
       if (pendingQueue.offer(bufferedEvent)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
deleted file mode 100644
index 25ce3d11420..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
index 1666ac09e22..e7d961b3c9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.resource;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 
 import java.io.File;
@@ -32,7 +31,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
-public class PipeTsFileResourceManager {
+public class PipeFileResourceManager {
 
   private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new HashMap<>();
 
@@ -181,7 +180,6 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    * @return the reference count of the file
    */
-  @TestOnly
   public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
     return hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 0);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
deleted file mode 100644
index 984c3fbaf24..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 2bcdfe4c366..61b4e61a04e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -21,16 +21,16 @@ package org.apache.iotdb.db.pipe.resource;
 
 public class PipeResourceManager {
 
-  private final PipeTsFileResourceManager pipeTsFileResourceManager;
+  private final PipeFileResourceManager pipeFileResourceManager;
 
-  public static PipeTsFileResourceManager tsfile() {
-    return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
+  public static PipeFileResourceManager file() {
+    return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
   }
 
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
-    pipeTsFileResourceManager = new PipeTsFileResourceManager();
+    pipeFileResourceManager = new PipeFileResourceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
deleted file mode 100644
index 78e0a7c612d..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index e2ceb644808..d9f2ea84bc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -67,20 +68,29 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
 
     try {
-      if (event instanceof TabletInsertionEvent) {
-        outputPipeConnector.transfer((TabletInsertionEvent) event);
-      } else if (event instanceof TsFileInsertionEvent) {
-        outputPipeConnector.transfer((TsFileInsertionEvent) event);
-      } else if (event instanceof DeletionEvent) {
-        outputPipeConnector.transfer((DeletionEvent) event);
-      } else {
-        throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
+      switch (event.getType()) {
+        case TABLET_INSERTION:
+          outputPipeConnector.transfer((TabletInsertionEvent) event);
+          break;
+        case TSFILE_INSERTION:
+          outputPipeConnector.transfer((TsFileInsertionEvent) event);
+          break;
+        case DELETION:
+          outputPipeConnector.transfer((DeletionEvent) event);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported event type: " + event.getClass().getName());
       }
     } catch (Exception e) {
       e.printStackTrace();
       throw new PipeException(
           "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
           e);
+    } finally {
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event).decreaseReferenceCount(PipeConnectorSubtask.class.getName());
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 3b7a59aa9e4..4d15d3a3528 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -58,26 +58,30 @@ public class PipeProcessorSubtask extends PipeSubtask {
       return;
     }
 
-    if (event instanceof PipeRealtimeCollectEvent) {
-      // dispatch the event
-      event = ((PipeRealtimeCollectEvent) event).getEvent();
-    }
-
     try {
-      if (event instanceof TabletInsertionEvent) {
-        pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
-      } else if (event instanceof TsFileInsertionEvent) {
-        pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
-      } else if (event instanceof DeletionEvent) {
-        pipeProcessor.process((DeletionEvent) event, outputEventCollector);
-      } else {
-        throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
+      switch (event.getType()) {
+        case TABLET_INSERTION:
+          pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
+          break;
+        case TSFILE_INSERTION:
+          pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
+          break;
+        case DELETION:
+          pipeProcessor.process((DeletionEvent) event, outputEventCollector);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported event type: " + event.getClass().getName());
       }
     } catch (Exception e) {
       e.printStackTrace();
       throw new PipeException(
           "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
           e);
+    } finally {
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event).decreaseReferenceCount(PipeProcessorSubtask.class.getName());
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index f376feb49ca..741e4ddb1b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -128,12 +128,12 @@ public class CachedSchemaPatternMatcherTest {
             new PipeRealtimeCollectEvent(
                 null, null, Collections.singletonMap("root." + i, measurements));
         long startTime = System.currentTimeMillis();
-        matcher.match(event);
+        matcher.match(event).forEach(collector -> collector.collect(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
       PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, null, deviceMap);
       long startTime = System.currentTimeMillis();
-      matcher.match(event);
+      matcher.match(event).forEach(collector -> collector.collect(event));
       totalTime += (System.currentTimeMillis() - startTime);
     }
     System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount());
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 214b8b0774e..f4e7f7f6843 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.core.collector;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -41,6 +43,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -57,26 +61,37 @@ public class PipeRealtimeCollectTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
 
-  private final String dataRegion1 = "dataRegion-1";
-  private final String dataRegion2 = "dataRegion-2";
+  private final String dataRegion1 = "1";
+  private final String dataRegion2 = "2";
   private final String pattern1 = "root.sg.d";
   private final String pattern2 = "root.sg.d.a";
   private final String[] device = new String[] {"root", "sg", "d"};
   private final AtomicBoolean alive = new AtomicBoolean();
+  private File tmpDir;
+  private File tsFileDir;
 
   private ExecutorService writeService;
   private ExecutorService listenerService;
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     writeService = Executors.newFixedThreadPool(2);
     listenerService = Executors.newFixedThreadPool(4);
+    tmpDir = new File(Files.createTempDirectory("pipeRealtimeCollect").toString());
+    tsFileDir =
+        new File(
+            tmpDir.getPath()
+                + File.separator
+                + IoTDBConstant.SEQUENCE_FLODER_NAME
+                + File.separator
+                + "root.sg");
   }
 
   @After
   public void tearDown() {
     writeService.shutdownNow();
     listenerService.shutdownNow();
+    FileUtils.deleteDirectory(tmpDir);
   }
 
   @Test
@@ -140,7 +155,8 @@ public class PipeRealtimeCollectTest {
       int writeNum = 10;
       List<Future<?>> writeFutures =
           Arrays.asList(
-              write2DataRegion(writeNum, dataRegion1), write2DataRegion(writeNum, dataRegion2));
+              write2DataRegion(writeNum, dataRegion1, 0),
+              write2DataRegion(writeNum, dataRegion2, 0));
 
       alive.set(true);
       List<Future<?>> listenFutures =
@@ -175,7 +191,8 @@ public class PipeRealtimeCollectTest {
       // test result of collector 0 - 3
       writeFutures =
           Arrays.asList(
-              write2DataRegion(writeNum, dataRegion1), write2DataRegion(writeNum, dataRegion2));
+              write2DataRegion(writeNum, dataRegion1, writeNum),
+              write2DataRegion(writeNum, dataRegion2, writeNum));
 
       alive.set(true);
       listenFutures =
@@ -211,12 +228,22 @@ public class PipeRealtimeCollectTest {
     }
   }
 
-  private Future<?> write2DataRegion(int writeNum, String dataRegionId) {
+  private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+    File dataRegionDir =
+        new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
+    boolean ignored = dataRegionDir.mkdirs();
     return writeService.submit(
         () -> {
-          for (int i = 0; i < writeNum; ++i) {
-            TsFileResource resource =
-                new TsFileResource(new File(dataRegionId, String.format("%s-%s-0-0.tsfile", i, i)));
+          for (int i = startNum; i < startNum + writeNum; ++i) {
+            File tsFile = new File(dataRegionDir, String.format("%s-%s-0-0.tsfile", i, i));
+            try {
+              boolean ignored1 = tsFile.createNewFile();
+            } catch (IOException e) {
+              e.printStackTrace();
+              throw new RuntimeException(e);
+            }
+
+            TsFileResource resource = new TsFileResource(tsFile);
             resource.updateStartTime(String.join(TsFileConstant.PATH_SEPARATOR, device), 0);
 
             PipeInsertionDataNodeListener.getInstance()
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
similarity index 77%
rename from server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index e4fc6b67143..b2441aa9d9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -47,7 +47,7 @@ import java.nio.file.Files;
 
 import static org.junit.Assert.fail;
 
-public class PipeTsFileResourceManagerTest {
+public class PipeFileResourceManagerTest {
 
   private static final String ROOT_DIR = "target" + File.separator + "PipeTsFileHolderTest";
   private static final String SEQUENCE_DIR =
@@ -55,11 +55,11 @@ public class PipeTsFileResourceManagerTest {
   private static final String TS_FILE_NAME = SEQUENCE_DIR + File.separator + "test.tsfile";
   private static final String MODS_FILE_NAME = TS_FILE_NAME + ".mods";
 
-  private PipeTsFileResourceManager pipeTsFileResourceManager;
+  private PipeFileResourceManager pipeFileResourceManager;
 
   @Before
   public void setUp() throws Exception {
-    pipeTsFileResourceManager = new PipeTsFileResourceManager();
+    pipeFileResourceManager = new PipeFileResourceManager();
 
     createTsfile(TS_FILE_NAME);
     creatModsFile(MODS_FILE_NAME);
@@ -150,27 +150,27 @@ public class PipeTsFileResourceManagerTest {
   public void testIncreaseTsfile() throws IOException {
     File originTsfile = new File(TS_FILE_NAME);
     File originModFile = new File(MODS_FILE_NAME);
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originTsfile));
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originModFile));
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originTsfile));
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
-    File pipeModFile = pipeTsFileResourceManager.increaseFileReference(originModFile, false);
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+    File pipeTsfile = pipeFileResourceManager.increaseFileReference(originTsfile, true);
+    File pipeModFile = pipeFileResourceManager.increaseFileReference(originModFile, false);
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
     Assert.assertTrue(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
 
     // test use hardlinkTsFile to increase reference counts
-    pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true);
-    Assert.assertEquals(2, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+    pipeFileResourceManager.increaseFileReference(pipeTsfile, true);
+    Assert.assertEquals(2, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
     Assert.assertTrue(Files.exists(originTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
 
     // test use copyFile to increase reference counts
-    pipeTsFileResourceManager.increaseFileReference(pipeModFile, false);
-    Assert.assertEquals(2, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+    pipeFileResourceManager.increaseFileReference(pipeModFile, false);
+    Assert.assertEquals(2, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
   }
@@ -180,15 +180,15 @@ public class PipeTsFileResourceManagerTest {
     File originFile = new File(TS_FILE_NAME);
     File originModFile = new File(MODS_FILE_NAME);
 
-    pipeTsFileResourceManager.decreaseFileReference(originFile);
-    pipeTsFileResourceManager.decreaseFileReference(originModFile);
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originFile));
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originModFile));
+    pipeFileResourceManager.decreaseFileReference(originFile);
+    pipeFileResourceManager.decreaseFileReference(originModFile);
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originFile));
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originModFile));
 
-    File pipeTsfile = pipeTsFileResourceManager.increaseFileReference(originFile, true);
-    File pipeModFile = pipeTsFileResourceManager.increaseFileReference(originModFile, false);
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+    File pipeTsfile = pipeFileResourceManager.increaseFileReference(originFile, true);
+    File pipeModFile = pipeFileResourceManager.increaseFileReference(originModFile, false);
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
@@ -199,17 +199,17 @@ public class PipeTsFileResourceManagerTest {
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
 
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
     Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
     Assert.assertTrue(Files.exists(pipeModFile.toPath()));
 
-    pipeTsFileResourceManager.decreaseFileReference(pipeTsfile);
-    pipeTsFileResourceManager.decreaseFileReference(pipeModFile);
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
-    Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+    pipeFileResourceManager.decreaseFileReference(pipeTsfile);
+    pipeFileResourceManager.decreaseFileReference(pipeModFile);
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+    Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
     Assert.assertFalse(Files.exists(originFile.toPath()));
     Assert.assertFalse(Files.exists(originModFile.toPath()));
     Assert.assertFalse(Files.exists(pipeTsfile.toPath()));