You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 06:31:55 UTC
[iotdb] branch master updated: [IOTDB-5870] Pipe: Event reference management (#9836)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new af2d050b54b [IOTDB-5870] Pipe: Event reference management (#9836)
af2d050b54b is described below
commit af2d050b54b0e38e76c9802ed46133a191f74a89
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sun May 14 14:31:49 2023 +0800
[IOTDB-5870] Pipe: Event reference management (#9836)
* Create event's reference count interface: EnrichedEvent
* Add reference count increasement and decreasement in PipeDataRegionAssginer, PipeRealtimeDataRegionHybridCollector, PipeEventCollector, PipeProcessorSubtask, PipeConnectorSubtask
---------
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../org/apache/iotdb/pipe/api/PipeCollector.java | 1 +
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 5 ++
.../PipeRealtimeDataRegionHybridCollector.java | 31 ++++++++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 20 ++++++--
.../matcher/CachedSchemaPatternMatcher.java | 6 +--
.../realtime/matcher/PipeDataRegionMatcher.java | 9 +++-
.../iotdb/db/pipe/core/event/EnrichedEvent.java | 54 ++++++++++++++++++++++
.../core/event/impl/PipeTabletInsertionEvent.java | 27 ++++++++++-
.../core/event/impl/PipeTsFileInsertionEvent.java | 48 ++++++++++++++++++-
.../event/realtime/PipeRealtimeCollectEvent.java | 20 +++++++-
.../realtime/PipeRealtimeCollectEventFactory.java | 2 -
.../event/view/collector/PipeEventCollector.java | 6 +++
.../iotdb/db/pipe/resource/PipeFileManager.java | 22 ---------
...ceManager.java => PipeFileResourceManager.java} | 4 +-
.../iotdb/db/pipe/resource/PipeRaftlogHolder.java | 22 ---------
.../db/pipe/resource/PipeResourceManager.java | 8 ++--
.../iotdb/db/pipe/resource/PipeWALHolder.java | 22 ---------
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 26 +++++++----
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 32 +++++++------
.../collector/CachedSchemaPatternMatcherTest.java | 4 +-
.../core/collector/PipeRealtimeCollectTest.java | 45 ++++++++++++++----
...rTest.java => PipeFileResourceManagerTest.java} | 54 +++++++++++-----------
22 files changed, 317 insertions(+), 151 deletions(-)
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
index db08728faff..724c1595571 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeCollector.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.pipe.api.event.Event;
* cancelled (the `DROP PIPE` command is executed).
* </ul>
*/
+// TODO: support event lifecycle management
public interface PipeCollector extends PipePlugin {
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 8c03766e459..1160ff4a845 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,4 +36,8 @@ public class PipeRuntimeAgent {
subtask.getTaskID(),
subtask.getLastFailedCause());
}
+
+ public void report(PipeRuntimeException pipeRuntimeException) {
+ // TODO: complete this method
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index d8b79fbfcff..ab7b705ce87 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,6 +121,9 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
"Unsupported event type %s for Hybrid Realtime Collector %s",
collectEvent.getEvent().getType(), this));
}
+
+ collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+
if (suppliedEvent != null) {
return suppliedEvent;
}
@@ -138,7 +144,15 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
(state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state);
if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
- return event.getEvent();
+ if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+ return event.getEvent();
+ } else {
+ // if the event's reference count can not be increased, it means the data represented by
+ // this event is not reliable anymore. but the data represented by this event
+ // has been carried by the following tsfile event, so we can just discard this event.
+ event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
+ return null;
+ }
}
// if the state is USING_TSFILE, discard the event and poll the next one.
return null;
@@ -160,7 +174,20 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
});
if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
- return event.getEvent();
+ if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+ return event.getEvent();
+ } else {
+ // if the event's reference count can not be increased, it means the data represented by
+ // this event is not reliable anymore. the data has been lost. we simply discard this event
+ // and report the exception to PipeRuntimeAgent.
+ final String errorMessage =
+ String.format(
+ "TsFile Event %s can not be supplied because the reference count can not be increased, "
+ + "the data represented by this event is lost",
+ event.getEvent());
+ PipeAgent.runtime().report(new PipeRuntimeNonCriticalException(errorMessage));
+ return null;
+ }
}
// if the state is USING_TABLET, discard the event and poll the next one.
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 38ee9257bef..4fc03f371cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
+import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -39,18 +40,27 @@ public class PipeDataRegionAssigner {
this.disruptor =
new DisruptorQueue.Builder<PipeRealtimeCollectEvent>()
.setProducerType(ProducerType.SINGLE)
- .addEventHandler(
- (event, sequence, endOfBatch) -> {
- matcher.match(event);
- event.gcSchemaInfo();
- })
+ .addEventHandler(this::assignToCollector)
.build();
}
public void publishToAssign(PipeRealtimeCollectEvent event) {
+ event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
disruptor.publish(event);
}
+ public void assignToCollector(PipeRealtimeCollectEvent event, long sequence, boolean endOfBatch) {
+ matcher
+ .match(event)
+ .forEach(
+ collector -> {
+ event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+ collector.collect(event);
+ });
+ event.gcSchemaInfo();
+ event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ }
+
public void startAssignTo(PipeRealtimeDataRegionCollector collector) {
matcher.register(collector);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index 0589cf09d7a..c8429331730 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -85,13 +85,13 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
// TODO: maximum the efficiency of matching when pattern is root
// TODO: memory control
@Override
- public void match(PipeRealtimeCollectEvent event) {
+ public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event) {
final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new HashSet<>();
lock.readLock().lock();
try {
if (collectors.isEmpty()) {
- return;
+ return matchedCollectors;
}
for (final Map.Entry<String, String[]> entry : event.getSchemaInfo().entrySet()) {
@@ -163,7 +163,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
lock.readLock().unlock();
}
- matchedCollectors.forEach(collector -> collector.collect(event));
+ return matchedCollectors;
}
private Set<PipeRealtimeDataRegionCollector> filterCollectorsByDevice(String device) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
index dcdbffa71eb..6411d7cead1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/PipeDataRegionMatcher.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import java.util.Set;
+
public interface PipeDataRegionMatcher {
/**
@@ -38,9 +40,12 @@ public interface PipeDataRegionMatcher {
/**
* Match the event's schema info with the registered collectors' patterns. If the event's schema
- * info matches the pattern of a collector, the event will be assigned to the collector.
+ * info matches the pattern of a collector, the collector will be returned.
+ *
+ * @param event the event to be matched
+ * @return the matched collectors
*/
- void match(PipeRealtimeCollectEvent event);
+ Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent event);
/** Clear all the registered collectors and internal data structures. */
void clear();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
new file mode 100644
index 00000000000..fa9e765ca25
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+/**
+ * EnrichedEvent is an event that can be enriched with additional runtime information. The
+ * additional information mainly includes the reference count of the event.
+ */
+public interface EnrichedEvent {
+
+ /**
+ * Increase the reference count of this event.
+ *
+ * @param holderMessage the message of the invoker
+ * @return true if the reference count is increased successfully, false if the event is not
+ * controlled by the invoker, which means the data stored in the event is not safe to use
+ */
+ boolean increaseReferenceCount(String holderMessage);
+
+ /**
+ * Decrease the reference count of this event. If the reference count is decreased to 0, the event
+ * can be recycled and the data stored in the event is not safe to use.
+ *
+ * @param holderMessage the message of the invoker
+ * @return true if the reference count is decreased successfully, false otherwise
+ */
+ boolean decreaseReferenceCount(String holderMessage);
+
+ /**
+ * Get the reference count of this event.
+ *
+ * @return the reference count
+ */
+ int getReferenceCount();
+
+ // TODO: ConsensusIndex getConsensusIndex();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 3afeabf9ea9..e97e0da9b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -20,20 +20,25 @@
package org.apache.iotdb.db.pipe.core.event.impl;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
-public class PipeTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedEvent {
private final InsertNode insertNode;
+ private final AtomicInteger referenceCount;
+
public PipeTabletInsertionEvent(InsertNode insertNode) {
this.insertNode = insertNode;
+ this.referenceCount = new AtomicInteger(0);
}
@Override
@@ -51,6 +56,26 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public boolean increaseReferenceCount(String holderMessage) {
+ // TODO: use WALPipeHandler pinMemtable
+ referenceCount.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public boolean decreaseReferenceCount(String holderMessage) {
+ // TODO: use WALPipeHandler unpinMemetable
+ referenceCount.decrementAndGet();
+ return true;
+ }
+
+ @Override
+ public int getReferenceCount() {
+ // TODO: use WALPipeHandler unpinMemetable
+ return referenceCount.get();
+ }
+
@Override
public String toString() {
return "PipeTabletInsertionEvent{" + "insertNode=" + insertNode + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 69d527e34ba..1a460b6b0a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,13 +19,21 @@
package org.apache.iotdb.db.pipe.core.event.impl;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
- private final File tsFile;
+public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, EnrichedEvent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
+
+ private File tsFile;
public PipeTsFileInsertionEvent(File tsFile) {
this.tsFile = tsFile;
@@ -41,6 +49,42 @@ public class PipeTsFileInsertionEvent implements TsFileInsertionEvent {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public boolean increaseReferenceCount(String holderMessage) {
+ try {
+ // TODO: increase reference count for mods & resource files
+ tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Increase reference count for TsFile %s error. Holder Message: %s",
+ tsFile.getPath(), holderMessage),
+ e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean decreaseReferenceCount(String holderMessage) {
+ try {
+ PipeResourceManager.file().decreaseFileReference(tsFile);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format(
+ "Decrease reference count for TsFile %s error. Holder Message: %s",
+ tsFile.getPath(), holderMessage),
+ e);
+ return false;
+ }
+ }
+
+ @Override
+ public int getReferenceCount() {
+ return PipeResourceManager.file().getFileReferenceCount(tsFile);
+ }
+
@Override
public String toString() {
return "PipeTsFileInsertionEvent{" + "tsFile=" + tsFile + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 4041e74918e..63fcc891361 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.pipe.core.event.realtime;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.EventType;
import java.util.Map;
-public class PipeRealtimeCollectEvent implements Event {
+public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
private final Event event;
private final TsFileEpoch tsFileEpoch;
@@ -59,6 +60,23 @@ public class PipeRealtimeCollectEvent implements Event {
return event.getType();
}
+ @Override
+ public boolean increaseReferenceCount(String holderMessage) {
+ return !(event instanceof EnrichedEvent)
+ || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+ }
+
+ @Override
+ public boolean decreaseReferenceCount(String holderMessage) {
+ return !(event instanceof EnrichedEvent)
+ || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+ }
+
+ @Override
+ public int getReferenceCount() {
+ return event instanceof EnrichedEvent ? ((EnrichedEvent) event).getReferenceCount() : 0;
+ }
+
@Override
public String toString() {
return "PipeRealtimeCollectEvent{" + "event=" + event + ", tsFileEpoch=" + tsFileEpoch + '}';
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 2bb1b3174f3..29cbefa8d14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -30,13 +30,11 @@ public class PipeRealtimeCollectEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager();
- // TODO: resource control here?
public static PipeRealtimeCollectEvent createCollectEvent(File tsFile, TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
new PipeTsFileInsertionEvent(tsFile), resource);
}
- // TODO: resource control here?
public static PipeRealtimeCollectEvent createCollectEvent(
InsertNode node, TsFileResource resource) {
return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 0d1d60fdde7..8bde5dda46c 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.core.event.view.collector;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -62,6 +64,10 @@ public class PipeEventCollector implements EventCollector {
}
private synchronized void collect(Event event) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event).increaseReferenceCount(PipeConnectorSubtask.class.getName());
+ }
+
while (!bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
if (pendingQueue.offer(bufferedEvent)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
deleted file mode 100644
index 25ce3d11420..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileManager.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeFileManager {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
index 1666ac09e22..e7d961b3c9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.resource;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.config.PipeConfig;
import java.io.File;
@@ -32,7 +31,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
-public class PipeTsFileResourceManager {
+public class PipeFileResourceManager {
private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new HashMap<>();
@@ -181,7 +180,6 @@ public class PipeTsFileResourceManager {
* @param hardlinkOrCopiedFile the copied or hardlinked file
* @return the reference count of the file
*/
- @TestOnly
public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
return hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 0);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
deleted file mode 100644
index 984c3fbaf24..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeRaftlogHolder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeRaftlogHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 2bcdfe4c366..61b4e61a04e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -21,16 +21,16 @@ package org.apache.iotdb.db.pipe.resource;
public class PipeResourceManager {
- private final PipeTsFileResourceManager pipeTsFileResourceManager;
+ private final PipeFileResourceManager pipeFileResourceManager;
- public static PipeTsFileResourceManager tsfile() {
- return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager;
+ public static PipeFileResourceManager file() {
+ return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
}
///////////////////////////// SINGLETON /////////////////////////////
private PipeResourceManager() {
- pipeTsFileResourceManager = new PipeTsFileResourceManager();
+ pipeFileResourceManager = new PipeFileResourceManager();
}
private static class PipeResourceManagerHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
deleted file mode 100644
index 78e0a7c612d..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeWALHolder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource;
-
-public class PipeWALHolder {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index e2ceb644808..d9f2ea84bc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -67,20 +68,29 @@ public class PipeConnectorSubtask extends PipeSubtask {
}
try {
- if (event instanceof TabletInsertionEvent) {
- outputPipeConnector.transfer((TabletInsertionEvent) event);
- } else if (event instanceof TsFileInsertionEvent) {
- outputPipeConnector.transfer((TsFileInsertionEvent) event);
- } else if (event instanceof DeletionEvent) {
- outputPipeConnector.transfer((DeletionEvent) event);
- } else {
- throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
+ switch (event.getType()) {
+ case TABLET_INSERTION:
+ outputPipeConnector.transfer((TabletInsertionEvent) event);
+ break;
+ case TSFILE_INSERTION:
+ outputPipeConnector.transfer((TsFileInsertionEvent) event);
+ break;
+ case DELETION:
+ outputPipeConnector.transfer((DeletionEvent) event);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported event type: " + event.getClass().getName());
}
} catch (Exception e) {
e.printStackTrace();
throw new PipeException(
"Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
e);
+ } finally {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event).decreaseReferenceCount(PipeConnectorSubtask.class.getName());
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 3b7a59aa9e4..4d15d3a3528 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.task.subtask;
-import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -58,26 +58,30 @@ public class PipeProcessorSubtask extends PipeSubtask {
return;
}
- if (event instanceof PipeRealtimeCollectEvent) {
- // dispatch the event
- event = ((PipeRealtimeCollectEvent) event).getEvent();
- }
-
try {
- if (event instanceof TabletInsertionEvent) {
- pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
- } else if (event instanceof TsFileInsertionEvent) {
- pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
- } else if (event instanceof DeletionEvent) {
- pipeProcessor.process((DeletionEvent) event, outputEventCollector);
- } else {
- throw new RuntimeException("Unsupported event type: " + event.getClass().getName());
+ switch (event.getType()) {
+ case TABLET_INSERTION:
+ pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
+ break;
+ case TSFILE_INSERTION:
+ pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
+ break;
+ case DELETION:
+ pipeProcessor.process((DeletionEvent) event, outputEventCollector);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported event type: " + event.getClass().getName());
}
} catch (Exception e) {
e.printStackTrace();
throw new PipeException(
"Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
e);
+ } finally {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event).decreaseReferenceCount(PipeProcessorSubtask.class.getName());
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index f376feb49ca..741e4ddb1b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -128,12 +128,12 @@ public class CachedSchemaPatternMatcherTest {
new PipeRealtimeCollectEvent(
null, null, Collections.singletonMap("root." + i, measurements));
long startTime = System.currentTimeMillis();
- matcher.match(event);
+ matcher.match(event).forEach(collector -> collector.collect(event));
totalTime += (System.currentTimeMillis() - startTime);
}
PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, null, deviceMap);
long startTime = System.currentTimeMillis();
- matcher.match(event);
+ matcher.match(event).forEach(collector -> collector.collect(event));
totalTime += (System.currentTimeMillis() - startTime);
}
System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount());
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 214b8b0774e..f4e7f7f6843 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.pipe.core.collector;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -41,6 +43,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -57,26 +61,37 @@ public class PipeRealtimeCollectTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeCollectTest.class);
- private final String dataRegion1 = "dataRegion-1";
- private final String dataRegion2 = "dataRegion-2";
+ private final String dataRegion1 = "1";
+ private final String dataRegion2 = "2";
private final String pattern1 = "root.sg.d";
private final String pattern2 = "root.sg.d.a";
private final String[] device = new String[] {"root", "sg", "d"};
private final AtomicBoolean alive = new AtomicBoolean();
+ private File tmpDir;
+ private File tsFileDir;
private ExecutorService writeService;
private ExecutorService listenerService;
@Before
- public void setUp() {
+ public void setUp() throws IOException {
writeService = Executors.newFixedThreadPool(2);
listenerService = Executors.newFixedThreadPool(4);
+ tmpDir = new File(Files.createTempDirectory("pipeRealtimeCollect").toString());
+ tsFileDir =
+ new File(
+ tmpDir.getPath()
+ + File.separator
+ + IoTDBConstant.SEQUENCE_FLODER_NAME
+ + File.separator
+ + "root.sg");
}
@After
public void tearDown() {
writeService.shutdownNow();
listenerService.shutdownNow();
+ FileUtils.deleteDirectory(tmpDir);
}
@Test
@@ -140,7 +155,8 @@ public class PipeRealtimeCollectTest {
int writeNum = 10;
List<Future<?>> writeFutures =
Arrays.asList(
- write2DataRegion(writeNum, dataRegion1), write2DataRegion(writeNum, dataRegion2));
+ write2DataRegion(writeNum, dataRegion1, 0),
+ write2DataRegion(writeNum, dataRegion2, 0));
alive.set(true);
List<Future<?>> listenFutures =
@@ -175,7 +191,8 @@ public class PipeRealtimeCollectTest {
// test result of collector 0 - 3
writeFutures =
Arrays.asList(
- write2DataRegion(writeNum, dataRegion1), write2DataRegion(writeNum, dataRegion2));
+ write2DataRegion(writeNum, dataRegion1, writeNum),
+ write2DataRegion(writeNum, dataRegion2, writeNum));
alive.set(true);
listenFutures =
@@ -211,12 +228,22 @@ public class PipeRealtimeCollectTest {
}
}
- private Future<?> write2DataRegion(int writeNum, String dataRegionId) {
+ private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
+ File dataRegionDir =
+ new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
+ boolean ignored = dataRegionDir.mkdirs();
return writeService.submit(
() -> {
- for (int i = 0; i < writeNum; ++i) {
- TsFileResource resource =
- new TsFileResource(new File(dataRegionId, String.format("%s-%s-0-0.tsfile", i, i)));
+ for (int i = startNum; i < startNum + writeNum; ++i) {
+ File tsFile = new File(dataRegionDir, String.format("%s-%s-0-0.tsfile", i, i));
+ try {
+ boolean ignored1 = tsFile.createNewFile();
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ TsFileResource resource = new TsFileResource(tsFile);
resource.updateStartTime(String.join(TsFileConstant.PATH_SEPARATOR, device), 0);
PipeInsertionDataNodeListener.getInstance()
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
similarity index 77%
rename from server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
rename to server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index e4fc6b67143..b2441aa9d9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -47,7 +47,7 @@ import java.nio.file.Files;
import static org.junit.Assert.fail;
-public class PipeTsFileResourceManagerTest {
+public class PipeFileResourceManagerTest {
private static final String ROOT_DIR = "target" + File.separator + "PipeTsFileHolderTest";
private static final String SEQUENCE_DIR =
@@ -55,11 +55,11 @@ public class PipeTsFileResourceManagerTest {
private static final String TS_FILE_NAME = SEQUENCE_DIR + File.separator + "test.tsfile";
private static final String MODS_FILE_NAME = TS_FILE_NAME + ".mods";
- private PipeTsFileResourceManager pipeTsFileResourceManager;
+ private PipeFileResourceManager pipeFileResourceManager;
@Before
public void setUp() throws Exception {
- pipeTsFileResourceManager = new PipeTsFileResourceManager();
+ pipeFileResourceManager = new PipeFileResourceManager();
createTsfile(TS_FILE_NAME);
creatModsFile(MODS_FILE_NAME);
@@ -150,27 +150,27 @@ public class PipeTsFileResourceManagerTest {
public void testIncreaseTsfile() throws IOException {
File originTsfile = new File(TS_FILE_NAME);
File originModFile = new File(MODS_FILE_NAME);
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originTsfile));
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originModFile));
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originTsfile));
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originModFile));
- File pipeTsfile = pipeTsFileResourceManager.increaseFileReference(originTsfile, true);
- File pipeModFile = pipeTsFileResourceManager.increaseFileReference(originModFile, false);
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+ File pipeTsfile = pipeFileResourceManager.increaseFileReference(originTsfile, true);
+ File pipeModFile = pipeFileResourceManager.increaseFileReference(originModFile, false);
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(originTsfile.toPath()));
Assert.assertTrue(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
// test use hardlinkTsFile to increase reference counts
- pipeTsFileResourceManager.increaseFileReference(pipeTsfile, true);
- Assert.assertEquals(2, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
+ pipeFileResourceManager.increaseFileReference(pipeTsfile, true);
+ Assert.assertEquals(2, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
Assert.assertTrue(Files.exists(originTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
// test use copyFile to increase reference counts
- pipeTsFileResourceManager.increaseFileReference(pipeModFile, false);
- Assert.assertEquals(2, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+ pipeFileResourceManager.increaseFileReference(pipeModFile, false);
+ Assert.assertEquals(2, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
}
@@ -180,15 +180,15 @@ public class PipeTsFileResourceManagerTest {
File originFile = new File(TS_FILE_NAME);
File originModFile = new File(MODS_FILE_NAME);
- pipeTsFileResourceManager.decreaseFileReference(originFile);
- pipeTsFileResourceManager.decreaseFileReference(originModFile);
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originFile));
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(originModFile));
+ pipeFileResourceManager.decreaseFileReference(originFile);
+ pipeFileResourceManager.decreaseFileReference(originModFile);
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originFile));
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(originModFile));
- File pipeTsfile = pipeTsFileResourceManager.increaseFileReference(originFile, true);
- File pipeModFile = pipeTsFileResourceManager.increaseFileReference(originModFile, false);
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+ File pipeTsfile = pipeFileResourceManager.increaseFileReference(originFile, true);
+ File pipeModFile = pipeFileResourceManager.increaseFileReference(originModFile, false);
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
@@ -199,17 +199,17 @@ public class PipeTsFileResourceManagerTest {
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(1, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(1, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
Assert.assertTrue(Files.exists(pipeTsfile.toPath()));
Assert.assertTrue(Files.exists(pipeModFile.toPath()));
- pipeTsFileResourceManager.decreaseFileReference(pipeTsfile);
- pipeTsFileResourceManager.decreaseFileReference(pipeModFile);
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(pipeTsfile));
- Assert.assertEquals(0, pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
+ pipeFileResourceManager.decreaseFileReference(pipeTsfile);
+ pipeFileResourceManager.decreaseFileReference(pipeModFile);
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(pipeTsfile));
+ Assert.assertEquals(0, pipeFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
Assert.assertFalse(Files.exists(pipeTsfile.toPath()));