You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/29 04:26:48 UTC
[iotdb] branch master updated: [IOTDB-3164] Manage the memory of cross space compaction in write memory controller (#6914)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f218f0bdb9 [IOTDB-3164] Manage the memory of cross space compaction in write memory controller (#6914)
f218f0bdb9 is described below
commit f218f0bdb917dd84478a0b51d27a87b952a2fae9
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Mon Aug 29 12:26:42 2022 +0800
[IOTDB-3164] Manage the memory of cross space compaction in write memory controller (#6914)
---
.../resources/conf/iotdb-datanode.properties | 11 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 43 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +-
.../db/engine/compaction/CompactionScheduler.java | 8 +-
.../db/engine/compaction/CompactionUtils.java | 16 -
.../compaction/cross/CrossSpaceCompactionTask.java | 11 +
.../RewriteCrossSpaceCompactionSelector.java | 256 ++++-
.../selector/RewriteCompactionFileSelector.java | 285 -----
.../cross/utils/InplaceCompactionEstimator.java | 3 +-
.../compaction/task/ICompactionSelector.java | 5 +
.../iotdb/db/rescon/PrimitiveArrayManager.java | 2 +-
.../org/apache/iotdb/db/rescon/SystemInfo.java | 29 +-
.../engine/compaction/CompactionSchedulerTest.java | 194 ++--
.../compaction/CompactionTaskComparatorTest.java | 1 +
.../compaction/CompactionTaskManagerTest.java | 1 +
.../compaction/cross/CrossSpaceCompactionTest.java | 33 +-
.../cross/CrossSpaceCompactionValidationTest.java | 1111 +++++++++++---------
.../engine/compaction/cross/MergeUpgradeTest.java | 19 +-
.../cross/RewriteCompactionFileSelectorTest.java | 296 ++++--
.../cross/RewriteCrossSpaceCompactionTest.java | 4 +
20 files changed, 1262 insertions(+), 1089 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 6680a76f24..99afaf642d 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -437,6 +437,10 @@ timestamp_precision=ms
# In cluster mode, we recommend 5:3:1:1. In standalone mode, we recommend 8:1:0:1
# schema_memory_allocate_proportion=5:3:1:1
+# Memory allocation ratio in StorageEngine: MemTable, Compaction
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 8:2 , 7:3
+# storage_engine_memory_proportion=8:2
+
# Max number of concurrent writing time partitions in one storage group
# This parameter is used to control total memTable number when memory control is disabled
# The max number of memTable is 4 * concurrent_writing_time_partition * storage group number
@@ -598,13 +602,6 @@ timestamp_precision=ms
# Datatype: long, Unit: ms
# cross_compaction_file_selection_time_budget=30000
-# How much memory may be used in ONE merge task, 10% of maximum JVM memory by default.
-# This is only a rough estimation, starting from a relatively small value to avoid OOM.
-# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
-# total memory estimation of merge.
-# Datatype: long, Unit: Byte
-# cross_compaction_memory_budget=268435456
-
# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 16d9bf0f69..2a876324db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -135,7 +135,7 @@ public class IoTDBConfig {
private int rpcMaxConcurrentClientNum = 65535;
/** Memory allocated for the write process */
- private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 10;
+ private long allocateMemoryForStorageEngine = Runtime.getRuntime().maxMemory() * 4 / 10;
/** Memory allocated for the read process */
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
@@ -154,6 +154,12 @@ public class IoTDBConfig {
/** Reject proportion for system */
private double rejectProportion = 0.8;
+ /** The proportion of write memory for memtable */
+ private double writeProportion = 0.8;
+
+ /** The proportion of write memory for compaction */
+ private double compactionProportion = 0.2;
+
/** If storage group increased more than this threshold, report to system. Unit: byte */
private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
@@ -624,9 +630,6 @@ public class IoTDBConfig {
/** TEXT encoding when creating schema automatically is enabled */
private TSEncoding defaultTextEncoding = TSEncoding.PLAIN;
- /** How much memory (in byte) can be used by a single merge task. */
- private long crossCompactionMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.1);
-
/** How many threads will be set up to perform upgrade tasks. */
private int upgradeThreadNum = 1;
@@ -1717,14 +1720,6 @@ public class IoTDBConfig {
this.chunkBufferPoolEnable = chunkBufferPoolEnable;
}
- public long getCrossCompactionMemoryBudget() {
- return crossCompactionMemoryBudget;
- }
-
- public void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
- this.crossCompactionMemoryBudget = crossCompactionMemoryBudget;
- }
-
public long getMergeIntervalSec() {
return mergeIntervalSec;
}
@@ -1765,12 +1760,12 @@ public class IoTDBConfig {
this.storageGroupSizeReportThreshold = storageGroupSizeReportThreshold;
}
- public long getAllocateMemoryForWrite() {
- return allocateMemoryForWrite;
+ public long getAllocateMemoryForStorageEngine() {
+ return allocateMemoryForStorageEngine;
}
- public void setAllocateMemoryForWrite(long allocateMemoryForWrite) {
- this.allocateMemoryForWrite = allocateMemoryForWrite;
+ public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) {
+ this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine;
}
public long getAllocateMemoryForSchema() {
@@ -3136,6 +3131,22 @@ public class IoTDBConfig {
this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
}
+ public double getWriteProportion() {
+ return writeProportion;
+ }
+
+ public void setWriteProportion(double writeProportion) {
+ this.writeProportion = writeProportion;
+ }
+
+ public double getCompactionProportion() {
+ return compactionProportion;
+ }
+
+ public void setCompactionProportion(double compactionProportion) {
+ this.compactionProportion = compactionProportion;
+ }
+
public long getThrottleThreshold() {
return throttleThreshold;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 769067d389..9cd2402c7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -590,11 +590,6 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty(
"upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
- conf.setCrossCompactionMemoryBudget(
- Long.parseLong(
- properties.getProperty(
- "cross_compaction_memory_budget",
- Long.toString(conf.getCrossCompactionMemoryBudget()))));
conf.setCrossCompactionFileSelectionTimeBudget(
Long.parseLong(
properties.getProperty(
@@ -1527,7 +1522,7 @@ public class IoTDBDescriptor {
}
long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
if (proportionSum != 0) {
- conf.setAllocateMemoryForWrite(
+ conf.setAllocateMemoryForStorageEngine(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForRead(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
@@ -1537,10 +1532,11 @@ public class IoTDBDescriptor {
}
logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
- logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForWrite());
+ logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine());
logger.info("allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema());
initSchemaMemoryAllocate(properties);
+ initStorageEngineAllocate(properties);
conf.setMaxQueryDeduplicatedPathNum(
Integer.parseInt(
@@ -1605,6 +1601,19 @@ public class IoTDBDescriptor {
}
}
+ private void initStorageEngineAllocate(Properties properties) {
+ String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2");
+ String[] proportions = allocationRatio.split(":");
+ int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+ int proportionForCompaction = Integer.parseInt(proportions[1].trim());
+ conf.setWriteProportion(
+ ((double) (proportionForMemTable)
+ / (double) (proportionForCompaction + proportionForMemTable)));
+ conf.setCompactionProportion(
+ ((double) (proportionForCompaction)
+ / (double) (proportionForCompaction + proportionForMemTable)));
+ }
+
private void initSchemaMemoryAllocate(Properties properties) {
long schemaMemoryTotal = conf.getAllocateMemoryForSchema();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index fd4bcf1c04..aa4eb75f72 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -149,19 +149,21 @@ public class CompactionScheduler {
crossSpaceCompactionSelector.selectCrossSpaceTask(
tsFileManager.getSequenceListByTimePartition(timePartition),
tsFileManager.getUnsequenceListByTimePartition(timePartition));
- for (Pair<List<TsFileResource>, List<TsFileResource>> selectedFilesPair : taskList) {
+ List<Long> memoryCost = crossSpaceCompactionSelector.getCompactionMemoryCost();
+ for (int i = 0, size = taskList.size(); i < size; ++i) {
CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
new CrossSpaceCompactionTask(
timePartition,
tsFileManager,
- selectedFilesPair.left,
- selectedFilesPair.right,
+ taskList.get(i).left,
+ taskList.get(i).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
CompactionTaskManager.currentTaskNum,
+ memoryCost.get(i),
tsFileManager.getNextCompactionTaskId()));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 23143a7b1d..5e5561b43e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -20,10 +20,6 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.constant.CrossCompactionSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceCompactionFileSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
@@ -221,16 +217,4 @@ public class CompactionUtils {
}
}
}
-
- public static ICrossSpaceCompactionFileSelector getCrossSpaceFileSelector(
- long budget, CrossSpaceCompactionResource resource) {
- CrossCompactionSelector strategy =
- IoTDBDescriptor.getInstance().getConfig().getCrossCompactionSelector();
- switch (strategy) {
- case REWRITE:
- return new RewriteCompactionFileSelector(resource, budget);
- default:
- throw new UnsupportedOperationException("Unknown CrossSpaceFileStrategy " + strategy);
- }
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index 2ffe154f23..8982220e13 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -57,6 +58,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
protected List<TsFileResource> holdReadLockList = new ArrayList<>();
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected long selectedFileSize = 0;
+ protected long memoryCost = 0L;
public CrossSpaceCompactionTask(
long timePartition,
@@ -65,6 +67,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
List<TsFileResource> selectedUnsequenceFiles,
ICrossCompactionPerformer performer,
AtomicInteger currentTaskNum,
+ long memoryCost,
long serialId) {
super(
tsFileManager.getStorageGroupName(),
@@ -79,10 +82,17 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
this.unseqTsFileResourceList = tsFileManager.getUnsequenceListByTimePartition(timePartition);
this.performer = performer;
this.hashCode = this.toString().hashCode();
+ this.memoryCost = memoryCost;
}
@Override
protected void doCompaction() {
+ try {
+ SystemInfo.getInstance().addCompactionMemoryCost(memoryCost);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted when allocating memory for compaction", e);
+ return;
+ }
try {
if (!tsFileManager.isAllowCompaction()) {
return;
@@ -193,6 +203,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
false,
true);
} finally {
+ SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
releaseAllLock();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index 9cd31bba23..35d4eb7653 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -22,19 +22,23 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
-import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.ICrossSpaceSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceCompactionFileSelector;
+import org.apache.iotdb.db.engine.compaction.cross.utils.AbstractCompactionEstimator;
+import org.apache.iotdb.db.engine.compaction.task.ICompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -47,6 +51,22 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
protected long timePartition;
protected TsFileManager tsFileManager;
+ private CrossSpaceCompactionResource resource;
+
+ private long totalCost;
+ private final long memoryBudget;
+ private final int maxCrossCompactionFileNum;
+
+ private List<TsFileResource> selectedUnseqFiles;
+ private List<TsFileResource> selectedSeqFiles;
+
+ private Collection<Integer> tmpSelectedSeqFiles;
+
+ private boolean[] seqSelected;
+ private int seqSelectedNum;
+
+ private AbstractCompactionEstimator compactionEstimator;
+
public RewriteCrossSpaceCompactionSelector(
String logicalStorageGroupName,
String dataRegionId,
@@ -56,6 +76,225 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
this.dataRegionId = dataRegionId;
this.timePartition = timePartition;
this.tsFileManager = tsFileManager;
+ this.memoryBudget =
+ SystemInfo.getInstance().getMemorySizeForCompaction()
+ / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
+ this.maxCrossCompactionFileNum =
+ IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
+ this.compactionEstimator =
+ ICompactionSelector.getCompactionEstimator(
+ IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
+ }
+
+ /**
+ * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. This process
+ * iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles as newly-added
+ * candidates and computes their estimated memory cost. If the current cost pluses the new cost is
+ * still under the budget, accept the unseqFile and the seqFiles as candidates, otherwise go to
+ * the next iteration. The memory cost of a file is calculated in two ways: The rough estimation:
+ * for a seqFile, the size of its metadata is used for estimation. Since in the worst case, the
+ * file only contains one timeseries and all its metadata will be loaded into memory with at most
+ * one actual data chunk (which is negligible) and writing the timeseries into a new file generate
+ * metadata of the similar size, so the size of all seqFiles' metadata (generated when writing new
+ * chunks) pluses the largest one (loaded when reading a timeseries from the seqFiles) is the
+ * total estimation of all seqFiles; for an unseqFile, since the merge reader may read all chunks
+ * of a series to perform a merge read, the whole file may be loaded into memory, so we use the
+ * file's length as the maximum estimation. The tight estimation: based on the rough estimation,
+ * we scan the file's metadata to count the number of chunks for each series, find the series
+ * which have the most chunks in the file and use its chunk proportion to refine the rough
+ * estimation. The rough estimation is performed first, if no candidates can be found using rough
+ * estimation, we run the selection again with tight estimation.
+ *
+ * @return two lists of TsFileResource, the former is selected seqFiles and the latter is selected
+ * unseqFiles or an empty array if there are no proper candidates by the budget.
+ */
+ private List[] select() throws MergeException {
+ long startTime = System.currentTimeMillis();
+ try {
+ LOGGER.debug(
+ "Selecting merge candidates from {} seqFile, {} unseqFiles",
+ resource.getSeqFiles().size(),
+ resource.getUnseqFiles().size());
+ selectSourceFiles();
+ if (selectedUnseqFiles.isEmpty()) {
+ LOGGER.debug("No merge candidates are found");
+ return new List[0];
+ }
+ } catch (IOException e) {
+ throw new MergeException(e);
+ } finally {
+ try {
+ compactionEstimator.clear();
+ } catch (IOException e) {
+ throw new MergeException(e);
+ }
+ }
+ LOGGER.info(
+ "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
+ + "time consumption {}ms",
+ selectedSeqFiles.size(),
+ selectedUnseqFiles.size(),
+ totalCost,
+ System.currentTimeMillis() - startTime);
+
+ return new List[] {selectedSeqFiles, selectedUnseqFiles};
+ }
+
+ /**
+ * In a preset time (30 seconds), for each unseqFile, find the list of seqFiles that overlap with
+ * it and have not been selected by the file selector of this compaction task. After finding each
+ * unseqFile and its corresponding overlap seqFile list, estimate the additional memory overhead
+ * that may be added by compacting them (preferably using the loop estimate), and if it does not
+ * exceed the memory overhead preset by the system for the compaction thread, put them into the
+ * selectedSeqFiles and selectedUnseqFiles.
+ */
+ void selectSourceFiles() throws IOException {
+ tmpSelectedSeqFiles = new HashSet<>();
+ seqSelected = new boolean[resource.getSeqFiles().size()];
+ seqSelectedNum = 0;
+ selectedSeqFiles = new ArrayList<>();
+ selectedUnseqFiles = new ArrayList<>();
+
+ totalCost = 0;
+
+ int unseqIndex = 0;
+ long startTime = System.currentTimeMillis();
+ long timeConsumption = 0;
+ long timeLimit =
+ IoTDBDescriptor.getInstance().getConfig().getCrossCompactionFileSelectionTimeBudget();
+ if (timeLimit < 0) {
+ timeLimit = Long.MAX_VALUE;
+ }
+ while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < timeLimit) {
+ // select next unseq files
+ TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
+ if (!unseqFile.getTsFile().exists() || unseqFile.isDeleted()) {
+ break;
+ }
+
+ if (seqSelectedNum != resource.getSeqFiles().size()) {
+ selectOverlappedSeqFiles(unseqFile);
+ }
+ boolean isSeqFilesValid = checkIsSeqFilesValid();
+ if (!isSeqFilesValid) {
+ tmpSelectedSeqFiles.clear();
+ break;
+ }
+
+ // Filter out the selected seq files
+ for (int i = 0; i < seqSelected.length; i++) {
+ if (seqSelected[i]) {
+ tmpSelectedSeqFiles.remove(i);
+ }
+ }
+
+ List<TsFileResource> tmpSelectedSeqFileResources = new ArrayList<>();
+ for (int seqIndex : tmpSelectedSeqFiles) {
+ tmpSelectedSeqFileResources.add(resource.getSeqFiles().get(seqIndex));
+ }
+ long newCost =
+ compactionEstimator.estimateCrossCompactionMemory(tmpSelectedSeqFileResources, unseqFile);
+ if (!updateSelectedFiles(newCost, unseqFile)) {
+ // older unseq files must be merged before newer ones
+ break;
+ }
+
+ tmpSelectedSeqFiles.clear();
+ unseqIndex++;
+ timeConsumption = System.currentTimeMillis() - startTime;
+ }
+ for (int i = 0; i < seqSelected.length; i++) {
+ if (seqSelected[i]) {
+ selectedSeqFiles.add(resource.getSeqFiles().get(i));
+ }
+ }
+ }
+
+ private boolean updateSelectedFiles(long newCost, TsFileResource unseqFile) {
+ if (selectedUnseqFiles.size() == 0
+ || (seqSelectedNum + selectedUnseqFiles.size() + 1 + tmpSelectedSeqFiles.size()
+ <= maxCrossCompactionFileNum
+ && totalCost + newCost < memoryBudget)) {
+ selectedUnseqFiles.add(unseqFile);
+
+ for (Integer seqIdx : tmpSelectedSeqFiles) {
+ if (!seqSelected[seqIdx]) {
+ seqSelectedNum++;
+ seqSelected[seqIdx] = true;
+ }
+ }
+ totalCost += newCost;
+ LOGGER.debug(
+ "Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
+ + " cost {}",
+ unseqFile,
+ tmpSelectedSeqFiles,
+ newCost,
+ totalCost);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * To avoid redundant data in seq files, cross space compaction should select all the seq files
+ * which have overlap with unseq files whether they are compacting or not. Therefore, before
+ * adding task into the queue, cross space compaction task should check whether source seq files
+ * are being compacted or not to speed up compaction.
+ */
+ private boolean checkIsSeqFilesValid() {
+ for (Integer seqIdx : tmpSelectedSeqFiles) {
+ if (resource.getSeqFiles().get(seqIdx).isCompactionCandidate()
+ || resource.getSeqFiles().get(seqIdx).isCompacting()
+ || !resource.getSeqFiles().get(seqIdx).isClosed()
+ || !resource.getSeqFiles().get(seqIdx).getTsFile().exists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Put the index of the seqFile that has an overlap with the specific unseqFile and has not been
+ * selected by the file selector of the compaction task into the tmpSelectedSeqFiles list. To
+ * determine whether overlap exists is to traverse each device ChunkGroup in unseqFiles, and
+ * determine whether it overlaps with the same device ChunkGroup of each seqFile that are not
+ * selected by the compaction task, if so, select this seqFile.
+ *
+ * @param unseqFile the tsFileResource of unseqFile to be compacted
+ */
+ private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+ for (String deviceId : unseqFile.getDevices()) {
+ long unseqStartTime = unseqFile.getStartTime(deviceId);
+ long unseqEndTime = unseqFile.getEndTime(deviceId);
+
+ boolean noMoreOverlap = false;
+ for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
+ TsFileResource seqFile = resource.getSeqFiles().get(i);
+ if (!seqFile.mayContainsDevice(deviceId)) {
+ continue;
+ }
+
+ long seqEndTime = seqFile.getEndTime(deviceId);
+ long seqStartTime = seqFile.getStartTime(deviceId);
+ if (!seqFile.isClosed()) {
+ // for unclosed file, only select those that overlap with the unseq file
+ if (unseqEndTime >= seqStartTime) {
+ tmpSelectedSeqFiles.add(i);
+ }
+ } else if (unseqEndTime <= seqEndTime) {
+ // if time range in unseq file is 10-20, seq file is 30-40, or
+ // time range in unseq file is 10-20, seq file is 15-25, then select this seq file and
+ // there is no more overlap later.
+ tmpSelectedSeqFiles.add(i);
+ noMoreOverlap = true;
+ } else if (unseqStartTime <= seqEndTime) {
+ // if time range in unseq file is 10-20, seq file is 0-15, then select this seq file and
+ // there may be overlap later.
+ tmpSelectedSeqFiles.add(i);
+ }
+ }
+ }
}
/**
@@ -86,15 +325,11 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
if (seqFileList.isEmpty() || unSeqFileList.isEmpty()) {
return Collections.emptyList();
}
- long budget = config.getCrossCompactionMemoryBudget();
long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
- CrossSpaceCompactionResource compactionResource =
- new CrossSpaceCompactionResource(seqFileList, unSeqFileList, timeLowerBound);
+ this.resource = new CrossSpaceCompactionResource(seqFileList, unSeqFileList, timeLowerBound);
- ICrossSpaceCompactionFileSelector fileSelector =
- CompactionUtils.getCrossSpaceFileSelector(budget, compactionResource);
try {
- List[] mergeFiles = fileSelector.select();
+ List[] mergeFiles = select();
if (mergeFiles.length == 0) {
return Collections.emptyList();
}
@@ -117,4 +352,9 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
}
return Collections.emptyList();
}
+
+ @Override
+ public List<Long> getCompactionMemoryCost() {
+ return Collections.singletonList(totalCost);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
deleted file mode 100644
index e7ce60cb12..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.compaction.cross.rewrite.selector;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.utils.AbstractCompactionEstimator;
-import org.apache.iotdb.db.engine.compaction.task.ICompactionSelector;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-/**
- * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
- * merged without exceeding given memory budget. It always assume the number of timeseries being
- * queried at the same time is 1 to maximize the number of file merged.
- */
-public class RewriteCompactionFileSelector implements ICrossSpaceCompactionFileSelector {
- private static final Logger logger =
- LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-
- private final CrossSpaceCompactionResource resource;
-
- private long totalCost;
- private final long memoryBudget;
- private final int maxCrossCompactionFileNum;
-
- private List<TsFileResource> selectedUnseqFiles;
- private List<TsFileResource> selectedSeqFiles;
-
- private Collection<Integer> tmpSelectedSeqFiles;
-
- private boolean[] seqSelected;
- private int seqSelectedNum;
-
- private AbstractCompactionEstimator compactionEstimator;
-
- public RewriteCompactionFileSelector(CrossSpaceCompactionResource resource, long memoryBudget) {
- this.resource = resource;
- this.memoryBudget = memoryBudget;
- this.maxCrossCompactionFileNum =
- IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
- this.compactionEstimator =
- ICompactionSelector.getCompactionEstimator(
- IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false);
- }
-
- /**
- * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. This process
- * iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles as newly-added
- * candidates and computes their estimated memory cost. If the current cost pluses the new cost is
- * still under the budget, accept the unseqFile and the seqFiles as candidates, otherwise go to
- * the next iteration. The memory cost of a file is calculated in two ways: The rough estimation:
- * for a seqFile, the size of its metadata is used for estimation. Since in the worst case, the
- * file only contains one timeseries and all its metadata will be loaded into memory with at most
- * one actual data chunk (which is negligible) and writing the timeseries into a new file generate
- * metadata of the similar size, so the size of all seqFiles' metadata (generated when writing new
- * chunks) pluses the largest one (loaded when reading a timeseries from the seqFiles) is the
- * total estimation of all seqFiles; for an unseqFile, since the merge reader may read all chunks
- * of a series to perform a merge read, the whole file may be loaded into memory, so we use the
- * file's length as the maximum estimation. The tight estimation: based on the rough estimation,
- * we scan the file's metadata to count the number of chunks for each series, find the series
- * which have the most chunks in the file and use its chunk proportion to refine the rough
- * estimation. The rough estimation is performed first, if no candidates can be found using rough
- * estimation, we run the selection again with tight estimation.
- *
- * @return two lists of TsFileResource, the former is selected seqFiles and the latter is selected
- * unseqFiles or an empty array if there are no proper candidates by the budget.
- */
- @Override
- public List[] select() throws MergeException {
- long startTime = System.currentTimeMillis();
- try {
- logger.debug(
- "Selecting merge candidates from {} seqFile, {} unseqFiles",
- resource.getSeqFiles().size(),
- resource.getUnseqFiles().size());
- selectSourceFiles();
- if (selectedUnseqFiles.isEmpty()) {
- logger.debug("No merge candidates are found");
- return new List[0];
- }
- } catch (IOException e) {
- throw new MergeException(e);
- } finally {
- try {
- compactionEstimator.clear();
- } catch (IOException e) {
- throw new MergeException(e);
- }
- }
- logger.info(
- "Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
- + "time consumption {}ms",
- selectedSeqFiles.size(),
- selectedUnseqFiles.size(),
- totalCost,
- System.currentTimeMillis() - startTime);
-
- return new List[] {selectedSeqFiles, selectedUnseqFiles};
- }
-
- /**
- * In a preset time (30 seconds), for each unseqFile, find the list of seqFiles that overlap with
- * it and have not been selected by the file selector of this compaction task. After finding each
- * unseqFile and its corresponding overlap seqFile list, estimate the additional memory overhead
- * that may be added by compacting them (preferably using the loop estimate), and if it does not
- * exceed the memory overhead preset by the system for the compaction thread, put them into the
- * selectedSeqFiles and selectedUnseqFiles.
- */
- void selectSourceFiles() throws IOException {
- tmpSelectedSeqFiles = new HashSet<>();
- seqSelected = new boolean[resource.getSeqFiles().size()];
- seqSelectedNum = 0;
- selectedSeqFiles = new ArrayList<>();
- selectedUnseqFiles = new ArrayList<>();
-
- totalCost = 0;
-
- int unseqIndex = 0;
- long startTime = System.currentTimeMillis();
- long timeConsumption = 0;
- long timeLimit =
- IoTDBDescriptor.getInstance().getConfig().getCrossCompactionFileSelectionTimeBudget();
- if (timeLimit < 0) {
- timeLimit = Long.MAX_VALUE;
- }
- while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < timeLimit) {
- // select next unseq files
- TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
- if (!unseqFile.getTsFile().exists() || unseqFile.isDeleted()) {
- break;
- }
-
- if (seqSelectedNum != resource.getSeqFiles().size()) {
- selectOverlappedSeqFiles(unseqFile);
- }
- boolean isSeqFilesValid = checkIsSeqFilesValid();
- if (!isSeqFilesValid) {
- tmpSelectedSeqFiles.clear();
- break;
- }
-
- // Filter out the selected seq files
- for (int i = 0; i < seqSelected.length; i++) {
- if (seqSelected[i]) {
- tmpSelectedSeqFiles.remove(i);
- }
- }
-
- List<TsFileResource> tmpSelectedSeqFileResources = new ArrayList<>();
- for (int seqIndex : tmpSelectedSeqFiles) {
- tmpSelectedSeqFileResources.add(resource.getSeqFiles().get(seqIndex));
- }
- long newCost =
- compactionEstimator.estimateCrossCompactionMemory(tmpSelectedSeqFileResources, unseqFile);
- if (!updateSelectedFiles(newCost, unseqFile)) {
- // older unseq files must be merged before newer ones
- break;
- }
-
- tmpSelectedSeqFiles.clear();
- unseqIndex++;
- timeConsumption = System.currentTimeMillis() - startTime;
- }
- for (int i = 0; i < seqSelected.length; i++) {
- if (seqSelected[i]) {
- selectedSeqFiles.add(resource.getSeqFiles().get(i));
- }
- }
- }
-
- private boolean updateSelectedFiles(long newCost, TsFileResource unseqFile) {
- if (selectedUnseqFiles.size() == 0
- || (seqSelectedNum + selectedUnseqFiles.size() + 1 + tmpSelectedSeqFiles.size()
- <= maxCrossCompactionFileNum
- && totalCost + newCost < memoryBudget)) {
- selectedUnseqFiles.add(unseqFile);
-
- for (Integer seqIdx : tmpSelectedSeqFiles) {
- if (!seqSelected[seqIdx]) {
- seqSelectedNum++;
- seqSelected[seqIdx] = true;
- }
- }
- totalCost += newCost;
- logger.debug(
- "Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
- + " cost {}",
- unseqFile,
- tmpSelectedSeqFiles,
- newCost,
- totalCost);
- return true;
- }
- return false;
- }
-
- /**
- * To avoid redundant data in seq files, cross space compaction should select all the seq files
- * which have overlap with unseq files whether they are compacting or not. Therefore, before
- * adding task into the queue, cross space compaction task should check whether source seq files
- * are being compacted or not to speed up compaction.
- */
- private boolean checkIsSeqFilesValid() {
- for (Integer seqIdx : tmpSelectedSeqFiles) {
- if (resource.getSeqFiles().get(seqIdx).isCompactionCandidate()
- || resource.getSeqFiles().get(seqIdx).isCompacting()
- || !resource.getSeqFiles().get(seqIdx).isClosed()
- || !resource.getSeqFiles().get(seqIdx).getTsFile().exists()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Put the index of the seqFile that has an overlap with the specific unseqFile and has not been
- * selected by the file selector of the compaction task into the tmpSelectedSeqFiles list. To
- * determine whether overlap exists is to traverse each device ChunkGroup in unseqFiles, and
- * determine whether it overlaps with the same device ChunkGroup of each seqFile that are not
- * selected by the compaction task, if so, select this seqFile.
- *
- * @param unseqFile the tsFileResource of unseqFile to be compacted
- */
- private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
- for (String deviceId : unseqFile.getDevices()) {
- long unseqStartTime = unseqFile.getStartTime(deviceId);
- long unseqEndTime = unseqFile.getEndTime(deviceId);
-
- boolean noMoreOverlap = false;
- for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
- TsFileResource seqFile = resource.getSeqFiles().get(i);
- if (!seqFile.mayContainsDevice(deviceId)) {
- continue;
- }
-
- long seqEndTime = seqFile.getEndTime(deviceId);
- long seqStartTime = seqFile.getStartTime(deviceId);
- if (!seqFile.isClosed()) {
- // for unclosed file, only select those that overlap with the unseq file
- if (unseqEndTime >= seqStartTime) {
- tmpSelectedSeqFiles.add(i);
- }
- } else if (unseqEndTime <= seqEndTime) {
- // if time range in unseq file is 10-20, seq file is 30-40, or
- // time range in unseq file is 10-20, seq file is 15-25, then select this seq file and
- // there is no more overlap later.
- tmpSelectedSeqFiles.add(i);
- noMoreOverlap = true;
- } else if (unseqStartTime <= seqEndTime) {
- // if time range in unseq file is 10-20, seq file is 0-15, then select this seq file and
- // there may be overlap later.
- tmpSelectedSeqFiles.add(i);
- }
- }
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/InplaceCompactionEstimator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/InplaceCompactionEstimator.java
index 5f195db339..9032c5d9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/InplaceCompactionEstimator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/InplaceCompactionEstimator.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.cross.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.cross.AbstractCrossSpaceEstimator;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -35,7 +34,7 @@ import java.util.List;
import java.util.Map;
public class InplaceCompactionEstimator extends AbstractCrossSpaceEstimator {
- private static final Logger logger = LoggerFactory.getLogger(RewriteCompactionFileSelector.class);
+ private static final Logger logger = LoggerFactory.getLogger(InplaceCompactionEstimator.class);
private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
private boolean tightEstimate;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/ICompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/ICompactionSelector.java
index 72063d5578..08983cc7d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/ICompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/ICompactionSelector.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.cross.utils.ReadPointCrossCompactio
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.util.Collections;
import java.util.List;
/**
@@ -55,6 +56,10 @@ public interface ICompactionSelector {
}
}
+ default List<Long> getCompactionMemoryCost() {
+ return Collections.emptyList();
+ }
+
static AbstractCompactionEstimator getCompactionEstimator(
CrossCompactionPerformer compactionPerformer, boolean isInnerSpace) {
switch (compactionPerformer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index 0f5b667c49..b156483116 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -48,7 +48,7 @@ public class PrimitiveArrayManager {
/** threshold total size of arrays for all data types */
private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
- CONFIG.getAllocateMemoryForWrite()
+ CONFIG.getAllocateMemoryForStorageEngine()
* CONFIG.getBufferedArraysMemoryProportion()
/ AMPLIFICATION_FACTOR;
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 88b639a82a..8c51e9eef5 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
public class SystemInfo {
@@ -43,10 +44,15 @@ public class SystemInfo {
private long totalStorageGroupMemCost = 0L;
private volatile boolean rejected = false;
- private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
+ private static long memorySizeForWrite =
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion());
+ private static long memorySizeForCompaction =
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion());
+
private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
private long flushingMemTablesCost = 0L;
+ private AtomicLong compactionMemoryCost = new AtomicLong(0L);
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
@@ -170,6 +176,27 @@ public class SystemInfo {
this.flushingMemTablesCost -= flushingMemTableCost;
}
+ public void addCompactionMemoryCost(long memoryCost) throws InterruptedException {
+ long originSize = this.compactionMemoryCost.get();
+ while (originSize + memoryCost > memorySizeForCompaction
+ || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
+ Thread.sleep(100);
+ originSize = this.compactionMemoryCost.get();
+ }
+ }
+
+ public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) {
+ this.compactionMemoryCost.addAndGet(-compactionMemoryCost);
+ }
+
+ public long getMemorySizeForCompaction() {
+ return memorySizeForCompaction;
+ }
+
+ public void setMemorySizeForCompaction(long size) {
+ memorySizeForCompaction = size;
+ }
+
private void logCurrentTotalSGMemory() {
logger.debug("Current Sg cost is {}", totalStorageGroupMemCost);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index 1716a6316d..5bd12b8d0d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -281,107 +282,116 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCrossCompactionMemoryBudget(2 * 1024 * 1024L * 1024L);
- String sgName = COMPACTION_TEST_SG + "test2";
- try {
- IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
- } catch (Exception e) {
- logger.error("exception occurs", e);
- }
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 2
+ * 1024
+ * 1024L
+ * 1024L
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
try {
- CompactionTaskManager.getInstance().restart();
- TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
- Set<String> fullPath = new HashSet<>();
- for (String device : fullPaths) {
- fullPath.add(sgName + device);
- PartialPath path = new PartialPath(sgName + device);
- IoTDB.schemaProcessor.createTimeseries(
- path,
- TSDataType.INT64,
- TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
- Collections.emptyMap());
- }
- for (int i = 0; i < 100; i++) {
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- tsFileManager.add(tsFileResource, true);
- }
- for (int i = 0; i < 100; i++) {
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
- tsFileManager.add(tsFileResource, false);
+ String sgName = COMPACTION_TEST_SG + "test2";
+ try {
+ IoTDB.schemaProcessor.setStorageGroup(new PartialPath(sgName));
+ } catch (Exception e) {
+ logger.error("exception occurs", e);
}
+ try {
+ CompactionTaskManager.getInstance().restart();
+ TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
+ Set<String> fullPath = new HashSet<>();
+ for (String device : fullPaths) {
+ fullPath.add(sgName + device);
+ PartialPath path = new PartialPath(sgName + device);
+ IoTDB.schemaProcessor.createTimeseries(
+ path,
+ TSDataType.INT64,
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ }
+ for (int i = 0; i < 100; i++) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(100L);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource tsFileResource =
+ CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, sgName);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
+ tsFileManager.add(tsFileResource, true);
+ }
+ for (int i = 0; i < 100; i++) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(100L);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource tsFileResource =
+ CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, sgName);
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
+ tsFileManager.add(tsFileResource, false);
+ }
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- long totalWaitingTime = 0;
- while (tsFileManager.getTsFileList(false).size() > 1) {
- try {
- Thread.sleep(100);
- totalWaitingTime += 100;
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- if (totalWaitingTime > MAX_WAITING_TIME) {
- fail();
- break;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ long totalWaitingTime = 0;
+ while (tsFileManager.getTsFileList(false).size() > 1) {
+ try {
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ if (totalWaitingTime > MAX_WAITING_TIME) {
+ fail();
+ break;
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- } catch (InterruptedException e) {
- e.printStackTrace();
}
- }
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- totalWaitingTime = 0;
- while (tsFileManager.getTsFileList(false).size() > 0) {
- try {
- Thread.sleep(10);
- totalWaitingTime += 10;
- if (totalWaitingTime > MAX_WAITING_TIME) {
- fail();
- break;
- }
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ totalWaitingTime = 0;
+ while (tsFileManager.getTsFileList(false).size() > 0) {
+ try {
+ Thread.sleep(10);
+ totalWaitingTime += 10;
+ if (totalWaitingTime > MAX_WAITING_TIME) {
+ fail();
+ break;
+ }
+ if (totalWaitingTime % 10_000 == 0) {
+ logger.warn(
+ "sequence file num is {}, unsequence file num is {}",
+ tsFileManager.getTsFileList(true).size(),
+ tsFileManager.getTsFileList(false).size());
+ }
+ if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
+ logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- } catch (InterruptedException e) {
- e.printStackTrace();
}
+ // assertEquals(100, tsFileManager.getTsFileList(true).size());
+ tsFileManager.setAllowCompaction(false);
+ stopCompactionTaskManager();
+ } finally {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setConcurrentCompactionThread(prevCompactionConcurrentThread);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
- // assertEquals(100, tsFileManager.getTsFileList(true).size());
- tsFileManager.setAllowCompaction(false);
- stopCompactionTaskManager();
} finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setConcurrentCompactionThread(prevCompactionConcurrentThread);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
index 8b85696e78..d83a019168 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
@@ -365,6 +365,7 @@ public class CompactionTaskComparatorTest {
selectedUnsequenceFiles,
new ReadPointCompactionPerformer(),
currentTaskNum,
+ 0,
serialId);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index eda36fca69..8ddd23d3f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -297,6 +297,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
unseqResources,
new ReadPointCompactionPerformer(),
new AtomicInteger(0),
+ 0,
0);
for (TsFileResource resource : seqResources) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index 766352659e..47a86d215c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -26,8 +26,7 @@ import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceCompactionFileSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
@@ -423,11 +422,12 @@ public class CrossSpaceCompactionTest {
CrossSpaceCompactionResource mergeResource =
new CrossSpaceCompactionResource(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
- ICrossSpaceCompactionFileSelector fileSelector =
- new RewriteCompactionFileSelector(mergeResource, Long.MAX_VALUE);
- List[] mergeFiles = fileSelector.select();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
index++;
- if (mergeFiles.length > 0) {
+ if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
@@ -442,6 +442,7 @@ public class CrossSpaceCompactionTest {
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
0);
compactionTask.start();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
@@ -729,10 +730,11 @@ public class CrossSpaceCompactionTest {
CrossSpaceCompactionResource mergeResource =
new CrossSpaceCompactionResource(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
- ICrossSpaceCompactionFileSelector fileSelector =
- new RewriteCompactionFileSelector(mergeResource, Long.MAX_VALUE);
- List[] mergeFiles = fileSelector.select();
- if (mergeFiles.length > 0) {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
+ if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
@@ -747,6 +749,7 @@ public class CrossSpaceCompactionTest {
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
0);
compactionTask.start();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
@@ -1033,10 +1036,11 @@ public class CrossSpaceCompactionTest {
CrossSpaceCompactionResource mergeResource =
new CrossSpaceCompactionResource(
seqTsFileResourceList, unseqTsFileResourceList, timeLowerBound);
- ICrossSpaceCompactionFileSelector fileSelector =
- new RewriteCompactionFileSelector(mergeResource, Long.MAX_VALUE);
- List[] mergeFiles = fileSelector.select();
- if (mergeFiles.length > 0) {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqTsFileResourceList, unseqTsFileResourceList);
+ if (selected.size() > 0) {
AbstractCompactionTask compactionTask =
new CrossSpaceCompactionTask(
0,
@@ -1051,6 +1055,7 @@ public class CrossSpaceCompactionTest {
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
0);
compactionTask.start();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
index 20003aec60..577c59cfc7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
@@ -23,8 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceCompactionFileSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
@@ -34,6 +33,7 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
import org.junit.Assert;
@@ -84,29 +84,28 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(1, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ Assert.assertEquals(1, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -131,28 +130,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(1, result[0].size());
- Assert.assertEquals(2, result[1].size());
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
+ Assert.assertEquals(1, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -177,28 +178,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -229,30 +232,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(4, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
- Assert.assertEquals(result[1].get(2), unseqResources.get(2));
- Assert.assertEquals(result[1].get(3), unseqResources.get(3));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(4, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).right.get(2), unseqResources.get(2));
+ Assert.assertEquals(selected.get(0).right.get(3), unseqResources.get(3));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -279,31 +284,33 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[0].get(3), seqResources.get(4));
- Assert.assertEquals(result[0].get(4), seqResources.get(5));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(5));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -330,29 +337,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(1, result[0].size());
- Assert.assertEquals(3, result[1].size());
- for (TsFileResource selectedResource : (List<TsFileResource>) result[0]) {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(1, selected.get(0).left.size());
+ Assert.assertEquals(3, selected.get(0).right.size());
+ for (TsFileResource selectedResource : (List<TsFileResource>) selected.get(0).left) {
Assert.assertEquals(selectedResource, seqResources.get(1));
}
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
- Assert.assertEquals(result[1].get(2), unseqResources.get(2));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).right.get(2), unseqResources.get(2));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -380,31 +389,33 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(4, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
- Assert.assertEquals(result[1].get(2), unseqResources.get(2));
- Assert.assertEquals(result[1].get(3), unseqResources.get(3));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(4, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).right.get(2), unseqResources.get(2));
+ Assert.assertEquals(selected.get(0).right.get(3), unseqResources.get(3));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -431,31 +442,33 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(4, result[1].size());
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(4, selected.get(0).right.size());
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
- Assert.assertEquals(result[1].get(2), unseqResources.get(2));
- Assert.assertEquals(result[1].get(3), unseqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
+ Assert.assertEquals(selected.get(0).right.get(2), unseqResources.get(2));
+ Assert.assertEquals(selected.get(0).right.get(3), unseqResources.get(3));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -485,28 +498,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -537,28 +552,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -590,27 +607,29 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -641,28 +660,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -694,28 +715,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -748,28 +771,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -801,29 +826,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -856,29 +883,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -912,29 +941,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -967,30 +998,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1023,30 +1056,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1079,30 +1114,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(6, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
- Assert.assertEquals(result[0].get(5), seqResources.get(7));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(6, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+ Assert.assertEquals(selected.get(0).left.get(5), seqResources.get(7));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1135,29 +1172,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(7));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(7));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1187,28 +1226,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1239,28 +1280,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1292,27 +1335,29 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1343,28 +1388,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1396,28 +1443,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1450,28 +1499,30 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1503,29 +1554,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1558,29 +1611,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1614,29 +1669,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
-
- Assert.assertEquals(result[0].get(0), seqResources.get(1));
- Assert.assertEquals(result[0].get(1), seqResources.get(2));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(1));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1669,30 +1726,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1725,30 +1784,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
-
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1781,30 +1842,32 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(6, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(6));
- Assert.assertEquals(result[0].get(5), seqResources.get(7));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(6, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(6));
+ Assert.assertEquals(selected.get(0).left.get(5), seqResources.get(7));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1837,29 +1900,31 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[0].get(2), seqResources.get(4));
- Assert.assertEquals(result[0].get(3), seqResources.get(5));
- Assert.assertEquals(result[0].get(4), seqResources.get(7));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).left.get(2), seqResources.get(4));
+ Assert.assertEquals(selected.get(0).left.get(3), seqResources.get(5));
+ Assert.assertEquals(selected.get(0).left.get(4), seqResources.get(7));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1893,26 +1958,28 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1948,20 +2015,22 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
// Assert.assertEquals(0, result.length);
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -1996,26 +2065,28 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -2050,27 +2121,29 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
- Assert.assertEquals(result[1].get(1), unseqResources.get(1));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
+ Assert.assertEquals(selected.get(0).right.get(1), unseqResources.get(1));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
@@ -2106,26 +2179,28 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(1, result[1].size());
- Assert.assertEquals(result[0].get(0), seqResources.get(2));
- Assert.assertEquals(result[0].get(1), seqResources.get(3));
- Assert.assertEquals(result[1].get(0), unseqResources.get(0));
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ Assert.assertEquals(selected.get(0).left.get(0), seqResources.get(2));
+ Assert.assertEquals(selected.get(0).left.get(1), seqResources.get(3));
+ Assert.assertEquals(selected.get(0).right.get(0), unseqResources.get(0));
new CrossSpaceCompactionTask(
0,
tsFileManager,
- result[0],
- result[1],
+ selected.get(0).left,
+ selected.get(0).right,
IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
new AtomicInteger(0),
+ 0,
tsFileManager.getNextCompactionTaskId())
.doCompaction();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeUpgradeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeUpgradeTest.java
index 6e81c159b5..559abd7a12 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeUpgradeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/MergeUpgradeTest.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -79,12 +80,14 @@ public class MergeUpgradeTest {
@Test
public void testMergeUpgradeSelect() throws MergeException {
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- RewriteCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- assertEquals(0, result.length);
+ TsFileManager tsFileManager = new TsFileManager("", "", "");
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, true);
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, tsFileManager);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ assertEquals(0, selected.size());
}
private void prepareFiles() throws IOException, WriteProcessException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 2b9a9d0fd4..8e970d8cba 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -23,14 +23,15 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionResource;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceCompactionFileSelector;
-import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
+import org.apache.iotdb.db.engine.compaction.cross.rewrite.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -51,7 +52,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
public class RewriteCompactionFileSelectorTest extends MergeTest {
private static final Logger logger =
@@ -59,29 +59,26 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
@Test
public void testFullSelection() throws MergeException, IOException {
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ List<TsFileResource> seqSelected = selected.get(0).left;
+ List<TsFileResource> unseqSelected = selected.get(0).right;
assertEquals(seqResources, seqSelected);
assertEquals(unseqResources, unseqSelected);
- resource = new CrossSpaceCompactionResource(seqResources.subList(0, 1), unseqResources);
- mergeFileSelector = new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ selector = new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ selected = selector.selectCrossSpaceTask(seqResources.subList(0, 1), unseqResources);
+ seqSelected = selected.get(0).left;
+ unseqSelected = selected.get(0).right;
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources, unseqSelected);
- resource = new CrossSpaceCompactionResource(seqResources, unseqResources.subList(0, 1));
- mergeFileSelector = new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- result = mergeFileSelector.select();
- seqSelected = result[0];
- unseqSelected = result[1];
+ selector = new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ selected = selector.selectCrossSpaceTask(seqResources, unseqResources.subList(0, 1));
+ seqSelected = selected.get(0).left;
+ unseqSelected = selected.get(0).right;
assertEquals(seqResources.subList(0, 1), seqSelected);
assertEquals(unseqResources.subList(0, 1), unseqSelected);
}
@@ -90,21 +87,23 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
public void testWithFewMemoryBudgeSelection() throws MergeException, IOException {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 1);
- List[] result = mergeFileSelector.select();
- assertEquals(2, result.length);
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ assertEquals(1, selected.size());
}
@Test
public void testRestrictedSelection() throws MergeException, IOException {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 400000);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ List<TsFileResource> seqSelected = selected.get(0).left;
+ List<TsFileResource> unseqSelected = selected.get(0).right;
assertEquals(seqResources.subList(0, 5), seqSelected);
assertEquals(unseqResources.subList(0, 6), unseqSelected);
}
@@ -155,12 +154,11 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
List<TsFileResource> newUnseqResources = new ArrayList<>();
newUnseqResources.add(largeUnseqTsFileResource);
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, newUnseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- assertEquals(0, result.length);
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, newUnseqResources);
+ assertEquals(0, selected.size());
}
/**
@@ -247,10 +245,11 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource =
new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- assertEquals(2, result[0].size());
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ assertEquals(2, selected.get(0).left.size());
}
@Test
@@ -305,21 +304,27 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
// the budget is enough to select unseq0 and unseq2, but not unseq1
// the first selection should only contain seq0 and unseq0
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 29000);
- List[] result = mergeFileSelector.select();
- assertEquals(1, result[0].size());
- assertEquals(1, result[1].size());
- assertEquals(seqList.get(0), result[0].get(0));
- assertEquals(unseqList.get(0), result[1].get(0));
-
- resource =
- new CrossSpaceCompactionResource(
- seqList.subList(1, seqList.size()), unseqList.subList(1, unseqList.size()));
- // Although memory is out of memoryBudget, at least one unseq file should be selected
- mergeFileSelector = new RewriteCompactionFileSelector(resource, 29000);
- result = mergeFileSelector.select();
- assertEquals(2, result.length);
+ long originMemoryBudget = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 29000L * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ assertEquals(1, selected.get(0).left.size());
+ assertEquals(1, selected.get(0).right.size());
+ assertEquals(seqList.get(0), selected.get(0).left.get(0));
+ assertEquals(unseqList.get(0), selected.get(0).right.get(0));
+
+ selected =
+ selector.selectCrossSpaceTask(
+ seqList.subList(1, seqList.size()), unseqList.subList(1, unseqList.size()));
+ assertEquals(1, selected.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(originMemoryBudget);
+ }
} finally {
removeFiles(seqList, unseqList);
@@ -381,12 +386,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
Assert.assertEquals(5, resource.getSeqFiles().size());
Assert.assertEquals(10, resource.getUnseqFiles().size());
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 500 * 1024 * 1024);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(1, result[0].size());
- Assert.assertEquals(10, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(1, selected.get(0).left.size());
+ Assert.assertEquals(10, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
/**
@@ -444,12 +461,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
Assert.assertEquals(5, resource.getSeqFiles().size());
Assert.assertEquals(1, resource.getUnseqFiles().size());
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 500 * 1024 * 1024);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(1, result[0].size());
- Assert.assertEquals(1, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(1, selected.get(0).left.size());
+ Assert.assertEquals(1, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
/**
@@ -507,12 +536,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
Assert.assertEquals(5, resource.getSeqFiles().size());
Assert.assertEquals(2, resource.getUnseqFiles().size());
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 500 * 1024 * 1024);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
/**
@@ -573,12 +614,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
Assert.assertEquals(5, resource.getSeqFiles().size());
Assert.assertEquals(4, resource.getUnseqFiles().size());
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 500 * 1024 * 1024);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(5, result[0].size());
- Assert.assertEquals(4, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(5, selected.get(0).left.size());
+ Assert.assertEquals(4, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
/**
@@ -642,12 +695,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
CrossSpaceCompactionResource resource = new CrossSpaceCompactionResource(seqList, unseqList);
Assert.assertEquals(5, resource.getSeqFiles().size());
Assert.assertEquals(4, resource.getUnseqFiles().size());
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, 500 * 1024 * 1024);
- List[] result = mergeFileSelector.select();
- Assert.assertEquals(2, result.length);
- Assert.assertEquals(3, result[0].size());
- Assert.assertEquals(2, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(3, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
@Test
@@ -862,13 +927,24 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
fileWriter.flushAllChunkGroups();
fileWriter.close();
- CrossSpaceCompactionResource compactionResource =
- new CrossSpaceCompactionResource(seqList, unseqList);
- RewriteCompactionFileSelector selector =
- new RewriteCompactionFileSelector(compactionResource, 500 * 1024 * 1024);
- List[] result = selector.select();
- Assert.assertEquals(2, result[0].size());
- Assert.assertEquals(2, result[1].size());
+ long origin = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 500L
+ * 1024
+ * 1024
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
+ try {
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqList, unseqList);
+ Assert.assertEquals(1, selected.size());
+ Assert.assertEquals(2, selected.get(0).left.size());
+ Assert.assertEquals(2, selected.get(0).right.size());
+ } finally {
+ SystemInfo.getInstance().setMemorySizeForCompaction(origin);
+ }
}
@Test
@@ -876,14 +952,13 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
int oldMaxCrossCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCrossCompactionCandidateFileNum(5);
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- assertEquals(2, result.length);
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ assertEquals(1, selected.size());
+ List<TsFileResource> seqSelected = selected.get(0).left;
+ List<TsFileResource> unseqSelected = selected.get(0).right;
assertEquals(2, seqSelected.size());
assertEquals(2, unseqSelected.size());
@@ -898,13 +973,13 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCrossCompactionCandidateFileNum(1);
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
- List[] result = mergeFileSelector.select();
- List<TsFileResource> seqSelected = result[0];
- List<TsFileResource> unseqSelected = result[1];
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
+ assertEquals(1, selected.size());
+ List<TsFileResource> seqSelected = selected.get(0).left;
+ List<TsFileResource> unseqSelected = selected.get(0).right;
assertEquals(1, seqSelected.size());
assertEquals(1, unseqSelected.size());
@@ -915,16 +990,15 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
@Test
public void testDeleteInSelection() throws Exception {
- CrossSpaceCompactionResource resource =
- new CrossSpaceCompactionResource(seqResources, unseqResources);
- ICrossSpaceCompactionFileSelector mergeFileSelector =
- new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
+ RewriteCrossSpaceCompactionSelector selector =
+ new RewriteCrossSpaceCompactionSelector("", "", 0, null);
AtomicBoolean fail = new AtomicBoolean(false);
Thread thread1 =
new Thread(
() -> {
try {
- mergeFileSelector.select();
+ List<Pair<List<TsFileResource>, List<TsFileResource>>> selected =
+ selector.selectCrossSpaceTask(seqResources, unseqResources);
} catch (Exception e) {
logger.error("Exception occurs", e);
fail.set(true);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index c5f3cb132d..40b6b8d17f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -227,6 +227,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
unseqResources,
new ReadPointCompactionPerformer(),
new AtomicInteger(0),
+ 0,
0);
task.start();
@@ -464,6 +465,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
unseqResources,
new ReadPointCompactionPerformer(),
new AtomicInteger(0),
+ 0,
0);
task.start();
@@ -611,6 +613,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
unseqResources,
new ReadPointCompactionPerformer(),
new AtomicInteger(0),
+ 0,
0);
task.setSourceFilesToCompactionCandidate();
task.checkValidAndSetMerging();
@@ -731,6 +734,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
unseqResources,
new ReadPointCompactionPerformer(),
new AtomicInteger(0),
+ 0,
0);
task.setSourceFilesToCompactionCandidate();
task.checkValidAndSetMerging();