You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/27 02:19:25 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5208]Fix file handles increase when TsFileResource is degraded and compacted with fast performer (#8574)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new f07b023790 [To rel/1.0][IOTDB-5208]Fix file handles increase when TsFileResource is degraded and compacted with fast performer (#8574)
f07b023790 is described below
commit f07b023790e0dc28ccab6b5bd1480092459c122b
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 27 10:19:20 2022 +0800
[To rel/1.0][IOTDB-5208]Fix file handles increase when TsFileResource is degraded and compacted with fast performer (#8574)
---
.../db/engine/compaction/CompactionUtils.java | 14 +-
.../compaction/cross/CrossSpaceCompactionTask.java | 2 -
.../performer/impl/FastCompactionPerformer.java | 2 +-
.../writer/AbstractCrossCompactionWriter.java | 10 +-
.../writer/FastCrossCompactionWriter.java | 14 +-
.../writer/ReadPointCrossCompactionWriter.java | 7 +
.../iotdb/db/query/control/FileReaderManager.java | 11 +
.../FastCrossCompactionPerformerTest.java | 258 +++++++++++++
.../FastInnerCompactionPerformerTest.java | 186 ++++++++++
.../ReadPointCompactionPerformerTest.java | 404 +++++++++++++++++++++
10 files changed, 887 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 7787a7611a..ee33684cbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -177,7 +176,7 @@ public class CompactionUtils {
logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName);
boolean result = true;
for (TsFileResource mergeTsFile : mergeTsFiles) {
- if (!deleteTsFile(mergeTsFile)) {
+ if (!mergeTsFile.remove()) {
result = false;
}
logger.info(
@@ -186,17 +185,6 @@ public class CompactionUtils {
return result;
}
- public static boolean deleteTsFile(TsFileResource seqFile) {
- try {
- FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
- seqFile.remove();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- return false;
- }
- return true;
- }
-
/** Delete all modification files for source files */
public static void deleteModificationForSourceFile(
Collection<TsFileResource> sourceFiles, String storageGroupName) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 4a555525d3..bd9c3a2ff7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
-import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.commons.io.FileUtils;
@@ -331,7 +330,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
private long deleteOldFiles(List<TsFileResource> tsFileResourceList) throws IOException {
long totalSize = 0;
for (TsFileResource tsFileResource : tsFileResourceList) {
- FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
totalSize += tsFileResource.getTsFileSize();
tsFileResource.remove();
LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
index 84f77b2a82..501d622c29 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java
@@ -115,7 +115,7 @@ public class FastCompactionPerformer
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
- ? new FastCrossCompactionWriter(targetFiles, seqFiles)
+ ? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
: new FastInnerCompactionWriter(targetFiles.get(0))) {
while (deviceIterator.hasNextDevice()) {
checkThreadInterrupted();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
index f1ef4433e1..cb8a102882 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java
@@ -22,10 +22,10 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -213,10 +213,10 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr
currentDeviceEndTime[fileIndex] = seqTsFileResources.get(fileIndex).getEndTime(deviceId);
} else {
long endTime = Long.MIN_VALUE;
+ // Fast compaction get reader from cache map, while read point compaction get reader from
+ // FileReaderManager
Map<String, TimeseriesMetadata> deviceMetadataMap =
- FileReaderManager.getInstance()
- .get(seqTsFileResources.get(fileIndex).getTsFilePath(), true)
- .readDeviceMetadata(deviceId);
+ getFileReader(seqTsFileResources.get(fileIndex)).readDeviceMetadata(deviceId);
for (Map.Entry<String, TimeseriesMetadata> entry : deviceMetadataMap.entrySet()) {
long tmpStartTime = entry.getValue().getStatistics().getStartTime();
long tmpEndTime = entry.getValue().getStatistics().getEndTime();
@@ -239,4 +239,6 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr
}
return totalSize;
}
+
+ protected abstract TsFileSequenceReader getFileReader(TsFileResource resource) throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/FastCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/FastCrossCompactionWriter.java
index 7498d4c79f..af43abfda1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/FastCrossCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/FastCrossCompactionWriter.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -32,13 +33,19 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter {
+ // Only used for fast compaction performer
+ protected Map<TsFileResource, TsFileSequenceReader> readerMap;
public FastCrossCompactionWriter(
- List<TsFileResource> targetResources, List<TsFileResource> seqSourceResources)
+ List<TsFileResource> targetResources,
+ List<TsFileResource> seqSourceResources,
+ Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
super(targetResources, seqSourceResources);
+ this.readerMap = readerMap;
}
@Override
@@ -47,6 +54,11 @@ public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter {
throw new RuntimeException("Does not support this method in FastCrossCompactionWriter");
}
+ @Override
+ protected TsFileSequenceReader getFileReader(TsFileResource resource) {
+ return readerMap.get(resource);
+ }
+
/**
* Flush nonAligned chunk to tsfile directly. Return whether the chunk is flushed to tsfile
* successfully or not. Return false if the unsealed chunk is too small or the end time of chunk
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/ReadPointCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/ReadPointCrossCompactionWriter.java
index 56f3464809..22b85f81f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/ReadPointCrossCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/ReadPointCrossCompactionWriter.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.engine.compaction.writer;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -61,6 +63,11 @@ public class ReadPointCrossCompactionWriter extends AbstractCrossCompactionWrite
lastTime[subTaskId] = timestamps.getEndTime();
}
+ @Override
+ protected TsFileSequenceReader getFileReader(TsFileResource resource) throws IOException {
+ return FileReaderManager.getInstance().get(resource.getTsFilePath(), true);
+ }
+
@Override
public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) {
throw new RuntimeException("Does not support this method in ReadPointCrossCompactionWriter");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index ede43b07f1..73f0978952 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.control;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -253,6 +254,16 @@ public class FileReaderManager {
}
}
+ @TestOnly
+ public Map<String, TsFileSequenceReader> getClosedFileReaderMap() {
+ return closedFileReaderMap;
+ }
+
+ @TestOnly
+ public Map<String, TsFileSequenceReader> getUnclosedFileReaderMap() {
+ return unclosedFileReaderMap;
+ }
+
private static class FileReaderManagerHelper {
private static final FileReaderManager INSTANCE = new FileReaderManager();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
index f744dda9ce..0fdc03d645 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
@@ -139,10 +139,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
@@ -253,10 +256,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -448,10 +454,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -640,10 +649,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -822,10 +834,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
@@ -997,10 +1012,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -1130,6 +1148,8 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -1205,6 +1225,8 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());
@@ -1339,10 +1361,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
@@ -1467,10 +1492,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1669,10 +1697,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -1906,10 +1937,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
@@ -2021,10 +2055,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(2, targetResources.size());
@@ -2147,10 +2184,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2286,10 +2326,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2404,10 +2447,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2539,10 +2585,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2696,10 +2745,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(3, targetResources.size());
@@ -2939,10 +2991,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -3207,10 +3262,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -3397,10 +3455,13 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -3711,12 +3772,16 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
}
}
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
targetResources.addAll(
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
ICompactionPerformer performer =
new FastCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
@@ -3790,6 +3855,199 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
}
}
+ /**
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ *
+ * <p>Seq files<br>
+ * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
+ * is 0 ~ 299 and 350 ~ 649.<br>
+ * third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
+ * range is 700 ~ 999 and 1050 ~ 1349.<br>
+ *
+ * <p>UnSeq files<br>
+ * first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
+ * ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
+ * forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
+ * 20450 ~ 20549 and 20550 ~ 20649.
+ */
+ @Test
+ public void testCrossSpaceCompactionWithFileTimeIndex() throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 5; j++) {
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ seqResources,
+ unseqResources,
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (i == 0
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
+ } else if ((i < 3 && j < 4)
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(1280, count);
+ } else if (i < 1 && j < 4) {
+ assertEquals(1230, count);
+ } else if (i == 0) {
+ assertEquals(800, count);
+ } else if ((i == 1 && j == 4)) {
+ assertEquals(600, count);
+ } else if (i < 3 && j < 4) {
+ assertEquals(1200, count);
+ } else {
+ assertEquals(600, count);
+ }
+ }
+ }
+
+ // degrade time index
+ for (TsFileResource resource : seqResources) {
+ resource.degradeTimeIndex();
+ }
+ for (TsFileResource resource : unseqResources) {
+ resource.degradeTimeIndex();
+ }
+
+ targetResources.addAll(
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources));
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ if (i == 0
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
+ } else if ((i < 3 && j < 4)
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(1280, count);
+ } else if (i < 1 && j < 4) {
+ assertEquals(1230, count);
+ } else if (i == 0) {
+ assertEquals(800, count);
+ } else if ((i == 1 && j == 4)) {
+ assertEquals(600, count);
+ } else if (i < 3 && j < 4) {
+ assertEquals(1200, count);
+ } else {
+ assertEquals(600, count);
+ }
+ }
+ }
+ }
+
private void validateSeqFiles() {
TsFileValidationTool.clearMap(true);
List<File> files = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastInnerCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastInnerCompactionPerformerTest.java
index 8bb4839ae4..42fec1c715 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastInnerCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastInnerCompactionPerformerTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -116,6 +117,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
assertEquals(500, count);
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -128,6 +130,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(true);
@@ -213,6 +217,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -225,6 +230,151 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
+ List<TsFileResource> targetResources = tsFileManager.getTsFileList(true);
+ validateSeqFiles(true);
+
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ assertEquals(
+ 250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(
+ 749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i));
+ }
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(400, count);
+ } else if (i < 3 && j < 5) {
+ assertEquals(200, count);
+ } else if (i < 5 && j < 5) {
+ assertEquals(100, count);
+ }
+ }
+ }
+ }
+
+ /*
+ Total 6 seq files, each file has different nonAligned timeseries.
+ First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
+ Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
+ Fifth and Sixth file: d0 ~ d4 and s0 ~ s5, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Timeseries d[0-4].s5 are deleted before compaction.
+ */
+ @Test
+ public void testSeqInnerSpaceCompactionWithFileTimeIndex() throws Exception {
+ registerTimeseriesInMManger(5, 5, false);
+ createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+ createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+ createFiles(2, 5, 6, 50, 600, 800, 50, 50, false, true);
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ seqResources,
+ unseqResources,
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(400, count);
+ } else if (i < 3) {
+ assertEquals(200, count);
+ } else {
+ assertEquals(100, count);
+ }
+ }
+ }
+
+ // degrade time index
+ for (TsFileResource resource : seqResources) {
+ resource.degradeTimeIndex();
+ }
+ for (TsFileResource resource : unseqResources) {
+ resource.degradeTimeIndex();
+ }
+
+ // start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ seqResources,
+ true,
+ new FastCompactionPerformer(false),
+ new AtomicInteger(0),
+ 0);
+ task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
List<TsFileResource> targetResources = tsFileManager.getTsFileList(true);
validateSeqFiles(true);
@@ -326,6 +476,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -338,6 +489,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
List<TsFileResource> targetResources = tsFileManager.getTsFileList(false);
validateSeqFiles(false);
@@ -440,6 +593,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -452,6 +606,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(true);
for (int i = 0; i < 9; i++) {
@@ -585,6 +741,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -597,6 +754,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = 0; i < 5; i++) {
@@ -719,6 +878,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -731,6 +891,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = 0; i < 5; i++) {
@@ -839,6 +1001,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -851,6 +1014,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(true);
for (int i = 0; i < 5; i++) {
@@ -929,6 +1094,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -941,6 +1107,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
List<TsFileResource> targetResources = tsFileManager.getTsFileList(true);
validateSeqFiles(true);
@@ -1048,6 +1216,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1060,6 +1229,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1176,6 +1347,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1188,6 +1360,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(true);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1309,6 +1483,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1321,6 +1496,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1492,6 +1669,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1504,6 +1682,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1655,6 +1835,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1667,6 +1848,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1770,6 +1953,7 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
}
// start compacting
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
InnerSpaceCompactionTask task =
@@ -1782,6 +1966,8 @@ public class FastInnerCompactionPerformerTest extends AbstractCompactionTest {
new AtomicInteger(0),
0);
task.start();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
validateSeqFiles(false);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 1e53a8ee37..48996b61a3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -131,6 +131,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
tsBlockReader =
@@ -220,6 +222,141 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ assertEquals(
+ 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ assertEquals(
+ 250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ assertEquals(
+ 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(
+ 749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i));
+ }
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ List<IMeasurementSchema> schemas = new ArrayList<>();
+ schemas.add(new MeasurementSchema("s" + j, TSDataType.INT64));
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(400, count);
+ } else if (i < 3 && j < 5) {
+ assertEquals(200, count);
+ } else if (i < 5 && j < 5) {
+ assertEquals(100, count);
+ }
+ }
+ }
+ }
+
+ /*
+ Total 6 seq files, each file has different nonAligned timeseries.
+ First and Second file: d0 ~ d1 and s0 ~ s2, time range is 0 ~ 99 and 150 ~ 249, value range is 0 ~ 99 and 150 ~ 249.
+ Third and Forth file: d0 ~ d2 and s0 ~ s4, time range is 250 ~ 299 and 350 ~ 399, value range is 250 ~ 299 and 350 ~ 399.
+ Fifth and Sixth file: d0 ~ d4 and s0 ~ s5, time range is 600 ~ 649 and 700 ~ 749, value range is 800 ~ 849 and 900 ~ 949.
+ Timeseries d[0-4].s5 are deleted before compaction.
+ */
+ @Test
+ public void testSeqInnerSpaceCompactionWithFileTimeIndex() throws Exception {
+ registerTimeseriesInMManger(5, 5, false);
+ createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+ createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+ createFiles(2, 5, 6, 50, 600, 800, 50, 50, false, true);
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ seqResources,
+ unseqResources,
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (iterator.currentTime() >= 600) {
+ assertEquals(iterator.currentTime() + 200, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(400, count);
+ } else if (i < 3) {
+ assertEquals(200, count);
+ } else {
+ assertEquals(100, count);
+ }
+ }
+ }
+
+ // degrade time index
+ for (TsFileResource resource : seqResources) {
+ resource.degradeTimeIndex();
+ }
+ for (TsFileResource resource : unseqResources) {
+ resource.degradeTimeIndex();
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
assertEquals(
0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
@@ -324,6 +461,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 2; i++) {
@@ -430,6 +569,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 9; i++) {
@@ -567,6 +708,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 5; i++) {
@@ -693,6 +836,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = 0; i < 5; i++) {
@@ -805,6 +950,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
for (int i = 0; i < 5; i++) {
@@ -887,6 +1034,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -997,6 +1146,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1117,6 +1268,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1242,6 +1395,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1417,6 +1572,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1572,6 +1729,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1679,6 +1838,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -1773,6 +1934,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
@@ -1887,6 +2050,200 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+
+ List<String> deviceIdList = new ArrayList<>();
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1");
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertFalse(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2");
+ deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
+ for (int i = 2; i < 4; i++) {
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"));
+ Assert.assertTrue(
+ targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"));
+ check(targetResources.get(i), deviceIdList);
+ }
+
+ Map<String, Long> measurementMaxTime = new HashMap<>();
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 5; j++) {
+ measurementMaxTime.putIfAbsent(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ Long.MIN_VALUE);
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ targetResources,
+ new ArrayList<>(),
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (measurementMaxTime.get(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j)
+ >= iterator.currentTime()) {
+ Assert.fail();
+ }
+ measurementMaxTime.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i + PATH_SEPARATOR + "s" + j,
+ iterator.currentTime());
+ if (i == 0
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
+ } else if ((i < 3 && j < 4)
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(1280, count);
+ } else if (i < 1 && j < 4) {
+ assertEquals(1230, count);
+ } else if (i == 0) {
+ assertEquals(800, count);
+ } else if ((i == 1 && j == 4)) {
+ assertEquals(600, count);
+ } else if (i < 3 && j < 4) {
+ assertEquals(1200, count);
+ } else {
+ assertEquals(600, count);
+ }
+ }
+ }
+ }
+
+ /**
+ * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries.
+ *
+ * <p>Seq files<br>
+ * first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range
+ * is 0 ~ 299 and 350 ~ 649.<br>
+ * third and forth file has d0 ~ d3 and s0 ~ S4,time range is 700 ~ 999 and 1050 ~ 1349, value
+ * range is 700 ~ 999 and 1050 ~ 1349.<br>
+ *
+ * <p>UnSeq files<br>
+ * first, second and third file has d0 ~ d2 and s0 ~ s3, time range is 20 ~ 219, 250 ~ 449 and 480
+ * ~ 679, value range is 10020 ~ 10219, 10250 ~ 10449 and 10480 ~ 10679.<br>
+ * forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is
+ * 20450 ~ 20549 and 20550 ~ 20649.
+ */
+ @Test
+ public void testCrossSpaceCompactionWithFileTimeIndex() throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
+ registerTimeseriesInMManger(4, 5, false);
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 5; j++) {
+ PartialPath path =
+ new MeasurementPath(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i,
+ "s" + j,
+ new MeasurementSchema("s" + j, TSDataType.INT64));
+ IDataBlockReader tsBlockReader =
+ new SeriesDataBlockReader(
+ path,
+ TSDataType.INT64,
+ FragmentInstanceContext.createFragmentInstanceContextForCompaction(
+ EnvironmentUtils.TEST_QUERY_CONTEXT.getQueryId()),
+ seqResources,
+ unseqResources,
+ true);
+ int count = 0;
+ while (tsBlockReader.hasNextBatch()) {
+ TsBlock block = tsBlockReader.nextBatch();
+ IBatchDataIterator iterator = block.getTsBlockSingleColumnIterator();
+ while (iterator.hasNext()) {
+ if (i == 0
+ && ((450 <= iterator.currentTime() && iterator.currentTime() < 550)
+ || (550 <= iterator.currentTime() && iterator.currentTime() < 650))) {
+ assertEquals(iterator.currentTime() + 20000, iterator.currentValue());
+ } else if ((i < 3 && j < 4)
+ && ((20 <= iterator.currentTime() && iterator.currentTime() < 220)
+ || (250 <= iterator.currentTime() && iterator.currentTime() < 450)
+ || (480 <= iterator.currentTime() && iterator.currentTime() < 680))) {
+ assertEquals(iterator.currentTime() + 10000, iterator.currentValue());
+ } else {
+ assertEquals(iterator.currentTime(), iterator.currentValue());
+ }
+ count++;
+ iterator.next();
+ }
+ }
+ tsBlockReader.close();
+ if (i < 2 && j < 3) {
+ assertEquals(1280, count);
+ } else if (i < 1 && j < 4) {
+ assertEquals(1230, count);
+ } else if (i == 0) {
+ assertEquals(800, count);
+ } else if ((i == 1 && j == 4)) {
+ assertEquals(600, count);
+ } else if (i < 3 && j < 4) {
+ assertEquals(1200, count);
+ } else {
+ assertEquals(600, count);
+ }
+ }
+ }
+
+ // degrade time index
+ for (TsFileResource resource : seqResources) {
+ resource.degradeTimeIndex();
+ }
+ for (TsFileResource resource : unseqResources) {
+ resource.degradeTimeIndex();
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
+ performer.setSummary(new CompactionTaskSummary());
+ performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2083,6 +2440,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2276,6 +2635,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -2459,6 +2820,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -2641,6 +3004,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -2813,6 +3178,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
tsBlockReader =
@@ -2941,6 +3308,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -3143,6 +3512,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -3380,6 +3751,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -3494,6 +3867,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(2, targetResources.size());
@@ -3620,6 +3995,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -3759,6 +4136,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -3878,6 +4257,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -4016,6 +4397,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -4168,6 +4551,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(2, targetResources.size());
@@ -4378,6 +4763,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -4646,6 +5033,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -4830,6 +5219,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -5020,6 +5411,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
List<String> deviceIdList = new ArrayList<>();
@@ -5330,12 +5723,15 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
}
}
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
performer.setTargetFiles(targetResources);
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -5447,6 +5843,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(3, targetResources.size());
@@ -5576,6 +5974,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
targetResources.removeIf(resource -> resource == null);
Assert.assertEquals(3, targetResources.size());
@@ -5687,6 +6087,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(4, targetResources.size());
@@ -5763,6 +6165,8 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
+ Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
+ Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
Assert.assertEquals(2, targetResources.size());