You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/08/29 07:34:16 UTC
[iotdb] 01/01: add memory control framework
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164-0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7929fb9a6fd1e29cb51bc98d8031e9f853d4594a
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Mon Aug 29 15:33:55 2022 +0800
add memory control framework
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 6 +-
.../resources/conf/iotdb-engine.properties | 11 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +-
.../compaction/cross/CrossCompactionStrategy.java | 6 +-
.../cross/CrossSpaceCompactionTaskFactory.java | 6 +-
.../RewriteCrossSpaceCompactionSelector.java | 14 +-
.../selector/ICrossSpaceMergeFileSelector.java | 2 +
.../selector/RewriteCompactionFileSelector.java | 6 +
.../task/RewriteCrossCompactionRecoverTask.java | 467 ---------------------
.../task/RewriteCrossSpaceCompactionTask.java | 13 +-
.../inner/utils/InnerSpaceCompactionUtils.java | 38 --
.../iotdb/db/rescon/PrimitiveArrayManager.java | 2 +-
.../org/apache/iotdb/db/rescon/SystemInfo.java | 28 +-
.../engine/compaction/CompactionSchedulerTest.java | 13 +-
.../compaction/CompactionTaskManagerTest.java | 3 +-
.../compaction/cross/CrossSpaceCompactionTest.java | 9 +-
.../cross/CrossSpaceCompactionValidationTest.java | 351 ++++++++++++++--
.../cross/RewriteCrossSpaceCompactionTest.java | 12 +-
.../task/FakedCrossSpaceCompactionTaskFactory.java | 47 ---
20 files changed, 454 insertions(+), 643 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index b15635b4a1..cedb74b603 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -298,13 +298,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
if (clusterConfig.getReplicationNum() > 1) {
clusterConfig.setMaxMemorySizeForRaftLog(
(long)
- (config.getAllocateMemoryForWrite()
+ (config.getAllocateMemoryForStorageEngine()
* clusterConfig.getRaftLogMemoryProportion()
/ clusterConfig.getReplicationNum()));
// calculate remaining memory allocated for write process
- config.setAllocateMemoryForWrite(
+ config.setAllocateMemoryForStorageEngine(
(long)
- (config.getAllocateMemoryForWrite()
+ (config.getAllocateMemoryForStorageEngine()
* (1 - clusterConfig.getRaftLogMemoryProportion())));
}
return true;
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ebd1a35242..3387f0107a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -345,6 +345,10 @@ timestamp_precision=ms
# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:2
# write_read_schema_free_memory_proportion=4:3:1:2
+# Memory allocation ratio in StorageEngine: Write, Compaction
+# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 8:2 , 7:3
+# storage_engine_memory_proportion=8:2
+
# primitive array size (length of each array) in array pool
# Datatype: int
# primitive_array_size=32
@@ -479,13 +483,6 @@ timestamp_precision=ms
# Datatype: long, Unit: ms
# cross_compaction_file_selection_time_budget=30000
-# How much memory may be used in ONE merge task, 10% of maximum JVM memory by default.
-# This is only a rough estimation, starting from a relatively small value to avoid OOM.
-# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
-# total memory estimation of merge.
-# Datatype: long, Unit: Byte
-# cross_compaction_memory_budget=268435456
-
# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c3e0bdcde9..a3448ba496 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -106,7 +106,7 @@ public class IoTDBConfig {
private int rpcMaxConcurrentClientNum = 65535;
/** Memory allocated for the write process */
- private long allocateMemoryForWrite = Runtime.getRuntime().maxMemory() * 4 / 10;
+ private long allocateMemoryForStorageEngine = Runtime.getRuntime().maxMemory() * 4 / 10;
/** Memory allocated for the read process */
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
@@ -125,6 +125,12 @@ public class IoTDBConfig {
/** Memory allocated proportion for timeIndex */
private double timeIndexMemoryProportion = 0.2;
+ /** The proportion of write memory for write process */
+ private double writeProportion = 0.8;
+
+ /** The proportion of write memory for compaction */
+ private double compactionProportion = 0.2;
+
/** Flush proportion for system */
private double flushProportion = 0.4;
@@ -1531,14 +1537,6 @@ public class IoTDBConfig {
this.chunkBufferPoolEnable = chunkBufferPoolEnable;
}
- public long getCrossCompactionMemoryBudget() {
- return crossCompactionMemoryBudget;
- }
-
- public void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
- this.crossCompactionMemoryBudget = crossCompactionMemoryBudget;
- }
-
public long getMergeIntervalSec() {
return mergeIntervalSec;
}
@@ -1587,12 +1585,12 @@ public class IoTDBConfig {
this.storageGroupSizeReportThreshold = storageGroupSizeReportThreshold;
}
- public long getAllocateMemoryForWrite() {
- return allocateMemoryForWrite;
+ public long getAllocateMemoryForStorageEngine() {
+ return allocateMemoryForStorageEngine;
}
- public void setAllocateMemoryForWrite(long allocateMemoryForWrite) {
- this.allocateMemoryForWrite = allocateMemoryForWrite;
+ public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) {
+ this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine;
}
public long getAllocateMemoryForSchema() {
@@ -2765,4 +2763,20 @@ public class IoTDBConfig {
public void setSchemaQueryFetchSize(int schemaQueryFetchSize) {
this.schemaQueryFetchSize = schemaQueryFetchSize;
}
+
+ public double getWriteProportion() {
+ return writeProportion;
+ }
+
+ public void setWriteProportion(double writeProportion) {
+ this.writeProportion = writeProportion;
+ }
+
+ public double getCompactionProportion() {
+ return compactionProportion;
+ }
+
+ public void setCompactionProportion(double compactionProportion) {
+ this.compactionProportion = compactionProportion;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index cdb0057a2d..8629974e7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -552,11 +552,6 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty(
"upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
- conf.setCrossCompactionMemoryBudget(
- Long.parseLong(
- properties.getProperty(
- "cross_compaction_memory_budget",
- Long.toString(conf.getCrossCompactionMemoryBudget()))));
conf.setCrossCompactionFileSelectionTimeBudget(
Long.parseLong(
properties.getProperty(
@@ -1407,7 +1402,7 @@ public class IoTDBDescriptor {
}
long maxMemoryAvailable = Runtime.getRuntime().maxMemory();
if (proportionSum != 0) {
- conf.setAllocateMemoryForWrite(
+ conf.setAllocateMemoryForStorageEngine(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForRead(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
@@ -1417,7 +1412,7 @@ public class IoTDBDescriptor {
}
logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
- logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForWrite());
+ logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine());
logger.info("allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema());
conf.setMaxQueryDeduplicatedPathNum(
@@ -1457,6 +1452,7 @@ public class IoTDBDescriptor {
}
}
}
+ initStorageEngineAllocate(properties);
}
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
@@ -1548,6 +1544,19 @@ public class IoTDBDescriptor {
"cqlog_buffer_size", Integer.toString(conf.getCqlogBufferSize()))));
}
+ private void initStorageEngineAllocate(Properties properties) {
+ String allocationRatio = properties.getProperty("storage_engine_memory_proportion", "8:2");
+ String[] proportions = allocationRatio.split(":");
+ int proportionForMemTable = Integer.parseInt(proportions[0].trim());
+ int proportionForCompaction = Integer.parseInt(proportions[1].trim());
+ conf.setWriteProportion(
+ ((double) (proportionForMemTable)
+ / (double) (proportionForCompaction + proportionForMemTable)));
+ conf.setCompactionProportion(
+ ((double) (proportionForCompaction)
+ / (double) (proportionForCompaction + proportionForMemTable)));
+ }
+
/** Get default encode algorithm by data type */
public TSEncoding getDefaultEncodingByType(TSDataType dataType) {
switch (dataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java
index 05ae8acf5a..257d1ab3e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossCompactionStrategy.java
@@ -42,7 +42,8 @@ public enum CrossCompactionStrategy {
long timePartitionId,
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
- List<TsFileResource> selectedUnSeqTsFileResourceList) {
+ List<TsFileResource> selectedUnSeqTsFileResourceList,
+ long memoryCost) {
switch (this) {
case REWRITE_COMPACTION:
default:
@@ -53,7 +54,8 @@ public enum CrossCompactionStrategy {
tsFileManager,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
- CompactionTaskManager.currentTaskNum);
+ CompactionTaskManager.currentTaskNum,
+ memoryCost);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java
index 3c98410ef3..1f68587e8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTaskFactory.java
@@ -34,7 +34,8 @@ public class CrossSpaceCompactionTaskFactory {
long timePartitionId,
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
- List<TsFileResource> selectedUnSeqTsFileResourceList) {
+ List<TsFileResource> selectedUnSeqTsFileResourceList,
+ long memoryCost) {
return IoTDBDescriptor.getInstance()
.getConfig()
.getCrossCompactionStrategy()
@@ -44,6 +45,7 @@ public class CrossSpaceCompactionTaskFactory {
timePartitionId,
tsFileManager,
selectedSeqTsFileResourceList,
- selectedUnSeqTsFileResourceList);
+ selectedUnSeqTsFileResourceList,
+ memoryCost);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index 452e4d0bf5..9599296f15 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,9 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
if (seqFileList.isEmpty() || unSeqFileList.isEmpty()) {
return;
}
- long budget = config.getCrossCompactionMemoryBudget();
+ long budget =
+ SystemInfo.getInstance().getMemorySizeForCompaction()
+ / config.getConcurrentCompactionThread();
long timeLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
CrossSpaceCompactionResource mergeResource =
new CrossSpaceCompactionResource(seqFileList, unSeqFileList, timeLowerBound);
@@ -97,6 +100,7 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
InnerSpaceCompactionUtils.getCrossSpaceFileSelector(budget, mergeResource);
try {
List[] mergeFiles = fileSelector.select();
+ List<Long> memoryCost = fileSelector.getMemoryCost();
// avoid pending tasks holds the metadata and streams
mergeResource.clear();
if (mergeFiles.length == 0) {
@@ -110,9 +114,10 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
return;
}
LOGGER.info(
- "select files for cross compaction, sequence files: {}, unsequence files {}",
+ "select files for cross compaction, sequence files: {}, unsequence files {}, memory cost is {}",
mergeFiles[0],
- mergeFiles[1]);
+ mergeFiles[1],
+ memoryCost.get(0));
if (mergeFiles[0].size() > 0 && mergeFiles[1].size() > 0) {
AbstractCompactionTask compactionTask =
@@ -122,7 +127,8 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
timePartition,
tsFileManager,
mergeFiles[0],
- mergeFiles[1]);
+ mergeFiles[1],
+ memoryCost.get(0));
CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
LOGGER.info(
"{} [Compaction] submit a task with {} sequence file and {} unseq files",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java
index b0d36564ad..70e849f6d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/ICrossSpaceMergeFileSelector.java
@@ -31,5 +31,7 @@ public interface ICrossSpaceMergeFileSelector {
List[] select() throws MergeException;
+ List<Long> getMemoryCost();
+
int getConcurrentMergeNum();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index 9c0009a4bb..d4da01db23 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -399,4 +400,9 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
public int getConcurrentMergeNum() {
return concurrentMergeNum;
}
+
+ @Override
+ public List<Long> getMemoryCost() {
+ return Collections.singletonList(totalCost);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java
deleted file mode 100644
index 3c7084b8f8..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
-
-import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.CompactionUtils;
-import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
-import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
-import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
-import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
-import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
-import org.apache.iotdb.db.utils.FileLoaderUtils;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompactionTask {
- private final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
- private File compactionLogFile;
-
- public RewriteCrossCompactionRecoverTask(
- String logicalStorageGroupName,
- String virtualStorageGroupName,
- long timePartitionId,
- File logFile,
- AtomicInteger currentTaskNum,
- TsFileManager tsFileManager) {
- super(
- logicalStorageGroupName,
- virtualStorageGroupName,
- timePartitionId,
- tsFileManager,
- null,
- null,
- currentTaskNum);
- this.compactionLogFile = logFile;
- }
-
- @Override
- public void doCompaction() {
- boolean handleSuccess = true;
- LOGGER.info(
- "{} [Compaction][Recover] cross space compaction log is {}",
- fullStorageGroupName,
- compactionLogFile);
- try {
- if (compactionLogFile.exists()) {
- LOGGER.info(
- "{} [Compaction][Recover] cross space compaction log file {} exists, start to recover it",
- fullStorageGroupName,
- compactionLogFile);
- CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(compactionLogFile);
- if (isOldLog()) {
- // log from previous version (<0.13)
- logAnalyzer.analyzeOldCrossCompactionLog();
- } else {
- logAnalyzer.analyze();
- }
- List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
- List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
-
- // compaction log file is incomplete
- if (targetFileIdentifiers.isEmpty() || sourceFileIdentifiers.isEmpty()) {
- LOGGER.info(
- "{} [Compaction][Recover] incomplete log file, abort recover", fullStorageGroupName);
- return;
- }
-
- // check is all source files existed
- boolean isAllSourcesFileExisted = true;
- for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (sourceFile == null) {
- isAllSourcesFileExisted = false;
- break;
- }
- }
- if (isAllSourcesFileExisted) {
- if (logAnalyzer.isLogFromOld()) {
- handleSuccess = handleWithAllSourceFilesExistFromOld(targetFileIdentifiers);
- } else {
- handleSuccess =
- handleWithAllSourceFilesExist(targetFileIdentifiers, sourceFileIdentifiers);
- }
- } else {
- if (logAnalyzer.isLogFromOld()) {
- handleSuccess =
- handleWithoutAllSourceFilesExistFromOld(
- targetFileIdentifiers, sourceFileIdentifiers);
- } else {
- handleSuccess = handleWithoutAllSourceFilesExist(sourceFileIdentifiers);
- }
- }
- }
- } catch (IOException e) {
- LOGGER.error("recover cross space compaction error", e);
- } finally {
- if (!handleSuccess) {
- LOGGER.error(
- "{} [Compaction][Recover] Failed to recover cross space compaction, set allowCompaction to false",
- fullStorageGroupName);
- tsFileManager.setAllowCompaction(false);
- } else {
- if (compactionLogFile.exists()) {
- try {
- LOGGER.info(
- "{} [Compaction][Recover] Recover compaction successfully, delete log file {}",
- fullStorageGroupName,
- compactionLogFile);
- FileUtils.delete(compactionLogFile);
- } catch (IOException e) {
- LOGGER.error(
- "{} [Compaction][Recover] Exception occurs while deleting log file {}, set allowCompaction to false",
- fullStorageGroupName,
- compactionLogFile,
- e);
- tsFileManager.setAllowCompaction(false);
- }
- }
- }
- }
- }
-
- /**
- * All source files exist: (1) delete all the target files and tmp target files (2) delete
- * compaction mods files.
- */
- private boolean handleWithAllSourceFilesExist(
- List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> sourceFileIdentifiers) {
- LOGGER.info(
- "{} [Compaction][Recover] all source files exists, delete all target files.",
- fullStorageGroupName);
-
- for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
- // xxx.cross
- File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
- // xxx.tsfile
- File targetFile =
- getFileFromDataDirs(
- targetFileIdentifier
- .getFilePath()
- .replace(
- IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
- TsFileConstant.TSFILE_SUFFIX));
- TsFileResource targetResource = null;
- if (tmpTargetFile != null) {
- targetResource = new TsFileResource(tmpTargetFile);
- } else if (targetFile != null) {
- targetResource = new TsFileResource(targetFile);
- }
-
- if (targetResource != null && !targetResource.remove()) {
- // failed to remove tmp target tsfile
- // system should not carry out the subsequent compaction in case of data redundant
- LOGGER.warn(
- "{} [Compaction][Recover] failed to remove target file {}",
- fullStorageGroupName,
- targetResource);
- return false;
- }
- }
-
- // delete compaction mods files
- List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
- for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- sourceTsFileResourceList.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
- }
- try {
- CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, Collections.emptyList());
- } catch (Throwable e) {
- LOGGER.error(
- "{} [Compaction][Recover] Exception occurs while deleting compaction mods file, set allowCompaction to false",
- fullStorageGroupName,
- e);
- return false;
- }
- return true;
- }
-
- /**
- * Some source files lost: delete remaining source files, encluding: tsfile, resource file, mods
- * file and compaction mods file.
- */
- private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFileIdentifiers) {
- // some source files have been deleted, while target file must exist.
- boolean handleSuccess = true;
- List<TsFileResource> remainSourceTsFileResources = new ArrayList<>();
- for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (sourceFile != null) {
- TsFileResource resource = new TsFileResource(sourceFile);
- resource.setStatus(TsFileResourceStatus.CLOSED);
- remainSourceTsFileResources.add(resource);
- } else {
- // if source file does not exist, its resource file may still exist, so delete it.
- File resourceFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX);
- if (resourceFile != null && !resourceFile.delete()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- fullStorageGroupName,
- resourceFile);
- handleSuccess = false;
- }
- }
- // delete .compaction.mods file and .mods file of all source files
- File compactionModFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
- File modFile =
- getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
- if (compactionModFile != null && !compactionModFile.delete()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- fullStorageGroupName,
- compactionModFile);
- handleSuccess = false;
- }
- if (modFile != null && !modFile.delete()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- fullStorageGroupName,
- modFile);
- handleSuccess = false;
- }
- }
- // delete remaining source files
- if (!InnerSpaceCompactionUtils.deleteTsFilesInDisk(
- remainSourceTsFileResources, fullStorageGroupName)) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete remaining source files.", fullStorageGroupName);
- handleSuccess = false;
- }
- return handleSuccess;
- }
-
- /** Delete tmp target file and compaction mods file. */
- private boolean handleWithAllSourceFilesExistFromOld(
- List<TsFileIdentifier> targetFileIdentifiers) {
- // delete tmp target file
- for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
- // xxx.tsfile.merge
- File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
- if (tmpTargetFile != null) {
- tmpTargetFile.delete();
- }
- }
- File compactionModsFileFromOld =
- new File(
- tsFileManager.getStorageGroupDir()
- + File.separator
- + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- if (compactionModsFileFromOld.exists() && !compactionModsFileFromOld.delete()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- fullStorageGroupName,
- compactionModsFileFromOld);
- return false;
- }
- return true;
- }
-
- /**
- * 1. If target file does not exist, then move .merge file to target file <br>
- * 2. If target resource file does not exist, then serialize it. <br>
- * 3. Append merging modification to target mods file and delete merging mods file. <br>
- * 4. Delete source files and .merge file. <br>
- */
- private boolean handleWithoutAllSourceFilesExistFromOld(
- List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> sourceFileIdentifiers) {
- try {
- File compactionModsFileFromOld =
- new File(
- tsFileManager.getStorageGroupDir()
- + File.separator
- + IoTDBConstant.COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD);
- List<TsFileResource> targetFileResources = new ArrayList<>();
- for (int i = 0; i < sourceFileIdentifiers.size(); i++) {
- TsFileIdentifier sourceFileIdentifier = sourceFileIdentifiers.get(i);
- if (sourceFileIdentifier.isSequence()) {
- File tmpTargetFile = targetFileIdentifiers.get(i).getFileFromDataDirs();
- File targetFile = null;
-
- // move tmp target file to target file if not exist
- if (tmpTargetFile != null) {
- // move tmp target file to target file
- String sourceFilePath =
- tmpTargetFile
- .getPath()
- .replace(
- TsFileConstant.TSFILE_SUFFIX
- + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
- TsFileConstant.TSFILE_SUFFIX);
- targetFile = TsFileNameGenerator.increaseCrossCompactionCnt(new File(sourceFilePath));
- FSFactoryProducer.getFSFactory().moveFile(tmpTargetFile, targetFile);
- } else {
- // target file must exist
- File file =
- TsFileNameGenerator.increaseCrossCompactionCnt(
- new File(
- targetFileIdentifiers
- .get(i)
- .getFilePath()
- .replace(
- TsFileConstant.TSFILE_SUFFIX
- + IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX_FROM_OLD,
- TsFileConstant.TSFILE_SUFFIX)));
-
- targetFile = getFileFromDataDirs(file.getPath());
- }
- if (targetFile == null) {
- LOGGER.error(
- "{} [Compaction][Recover] target file of source seq file {} does not exist (<0.13).",
- fullStorageGroupName,
- sourceFileIdentifier.getFilePath());
- return false;
- }
-
- // serialize target resource file if not exist
- TsFileResource targetResource = new TsFileResource(targetFile);
- if (!targetResource.resourceFileExists()) {
- try (TsFileSequenceReader reader =
- new TsFileSequenceReader(targetFile.getAbsolutePath())) {
- FileLoaderUtils.updateTsFileResource(reader, targetResource);
- }
- targetResource.serialize();
- }
-
- targetFileResources.add(targetResource);
-
- // append compaction modifications to target mods file and delete compaction mods file
- if (compactionModsFileFromOld.exists()) {
- ModificationFile compactionModsFile =
- new ModificationFile(compactionModsFileFromOld.getPath());
- appendCompactionModificationsFromOld(targetResource, compactionModsFile);
- }
-
- // delete tmp target file
- if (tmpTargetFile != null) {
- tmpTargetFile.delete();
- }
- }
-
- // delete source tsfile
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (sourceFile != null) {
- sourceFile.delete();
- }
-
- // delete source resource file
- sourceFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX);
- if (sourceFile != null) {
- sourceFile.delete();
- }
-
- // delete source mods file
- sourceFile =
- getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
- if (sourceFile != null) {
- sourceFile.delete();
- }
- }
-
- // delete compaction mods file
- if (compactionModsFileFromOld.exists() && !compactionModsFileFromOld.delete()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- fullStorageGroupName,
- compactionModsFileFromOld);
- return false;
- }
- } catch (Throwable e) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to handle with some source files lost from old version.",
- fullStorageGroupName,
- e);
- return false;
- }
-
- return true;
- }
-
- public static void appendCompactionModificationsFromOld(
- TsFileResource resource, ModificationFile compactionModsFile) throws IOException {
-
- if (compactionModsFile != null) {
- for (Modification modification : compactionModsFile.getModifications()) {
- // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
- // change after compaction
- modification.setFileOffset(Long.MAX_VALUE);
- resource.getModFile().write(modification);
- }
- resource.getModFile().close();
- }
- }
-
- /**
- * This method find the File object of given filePath by searching it in every data directory. If
- * the file is not found, it will return null.
- */
- private File getFileFromDataDirs(String filePath) {
- String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
- for (String dataDir : dataDirs) {
- File f = new File(dataDir, filePath);
- if (f.exists()) {
- return f;
- }
- }
- return null;
- }
-
- @Override
- public boolean equalsOtherTask(AbstractCompactionTask other) {
- if (other instanceof RewriteCrossCompactionRecoverTask) {
- return compactionLogFile.equals(
- ((RewriteCrossCompactionRecoverTask) other).compactionLogFile);
- }
- return false;
- }
-
- @Override
- public boolean checkValidAndSetMerging() {
- return compactionLogFile.exists();
- }
-
- /** Return whether compaction log file is from previous version (<0.13). */
- private boolean isOldLog() {
- return compactionLogFile.getName().equals(CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index dc24ed005a..e526893381 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.commons.io.FileUtils;
@@ -55,6 +56,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
protected TsFileResourceList seqTsFileResourceList;
protected TsFileResourceList unseqTsFileResourceList;
private File logFile;
+ private long memoryCost = -1;
private List<TsFileResource> targetTsfileResourceList;
private List<TsFileResource> holdReadLockList = new ArrayList<>();
@@ -67,7 +69,8 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList,
- AtomicInteger currentTaskNum) {
+ AtomicInteger currentTaskNum,
+ long memoryCost) {
super(
logicalStorageGroupName + "-" + virtualStorageGroupName,
timePartitionId,
@@ -79,10 +82,17 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
this.selectedUnSeqTsFileResourceList = selectedUnSeqTsFileResourceList;
this.seqTsFileResourceList = tsFileManager.getSequenceListByTimePartition(timePartition);
this.unseqTsFileResourceList = tsFileManager.getUnsequenceListByTimePartition(timePartition);
+ this.memoryCost = memoryCost;
}
@Override
protected void doCompaction() throws Exception {
+ try {
+ SystemInfo.getInstance().addCompactionMemoryCost(memoryCost);
+ } catch (InterruptedException e) {
+ logger.error("Thread get interrupted when allocating memory for compaction", e);
+ return;
+ }
try {
executeCompaction();
} catch (Throwable throwable) {
@@ -100,6 +110,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
true);
throw throwable;
} finally {
+ SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
releaseAllLock();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 1f48abf35d..e7590e5589 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceM
import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -43,7 +42,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +49,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -171,34 +168,6 @@ public class InnerSpaceCompactionUtils {
}
}
- /**
- * This method is called to recover modifications while an exception occurs during compaction. It
- * append new modifications of each selected tsfile to its corresponding old mods file and delete
- * the compaction mods file.
- *
- * @param selectedTsFileResources
- * @throws IOException
- */
- public static void appendNewModificationsToOldModsFile(
- List<TsFileResource> selectedTsFileResources) throws IOException {
- for (TsFileResource sourceFile : selectedTsFileResources) {
- // if there are modifications to this seqFile during compaction
- if (sourceFile.getCompactionModFile().exists()) {
- ModificationFile compactionModificationFile =
- ModificationFile.getCompactionMods(sourceFile);
- Collection<Modification> newModification = compactionModificationFile.getModifications();
- compactionModificationFile.close();
- // write the new modifications to its old modification file
- try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
- for (Modification modification : newModification) {
- oldModificationFile.write(modification);
- }
- }
- FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath()));
- }
- }
- }
-
/**
* Collect all the compaction modification files of source files, and combines them as the
* modification file of target file.
@@ -248,13 +217,6 @@ public class InnerSpaceCompactionUtils {
}
}
- public static class TsFileNameComparator implements Comparator<TsFileSequenceReader> {
-
- @Override
- public int compare(TsFileSequenceReader o1, TsFileSequenceReader o2) {
- return TsFileManager.compareFileName(new File(o1.getFileName()), new File(o2.getFileName()));
- }
- }
/**
* Update the targetResource. Move xxx.target to xxx.tsfile and serialize xxx.tsfile.resource .
*
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
index 0f5b667c49..b156483116 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java
@@ -48,7 +48,7 @@ public class PrimitiveArrayManager {
/** threshold total size of arrays for all data types */
private static final double POOLED_ARRAYS_MEMORY_THRESHOLD =
- CONFIG.getAllocateMemoryForWrite()
+ CONFIG.getAllocateMemoryForStorageEngine()
* CONFIG.getBufferedArraysMemoryProportion()
/ AMPLIFICATION_FACTOR;
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 334bb965ca..8f3fa6c48e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
public class SystemInfo {
@@ -43,10 +44,14 @@ public class SystemInfo {
private long totalStorageGroupMemCost = 0L;
private volatile boolean rejected = false;
- private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
+ private static long memorySizeForWrite =
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion());
+ private static long memorySizeForCompaction =
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion());
private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
private long flushingMemTablesCost = 0L;
+ private AtomicLong compactionMemoryCost = new AtomicLong(0L);
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
@@ -269,4 +274,25 @@ public class SystemInfo {
public int flushingMemTableNum() {
return FlushManager.getInstance().getNumberOfWorkingTasks();
}
+
+ public void addCompactionMemoryCost(long memoryCost) throws InterruptedException {
+ long originSize = this.compactionMemoryCost.get();
+ while (originSize + memoryCost > memorySizeForCompaction
+ || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
+ Thread.sleep(100);
+ originSize = this.compactionMemoryCost.get();
+ }
+ }
+
+ public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) {
+ this.compactionMemoryCost.addAndGet(-compactionMemoryCost);
+ }
+
+ public long getMemorySizeForCompaction() {
+ return memorySizeForCompaction;
+ }
+
+ public void setMemorySizeForCompaction(long size) {
+ memorySizeForCompaction = size;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index 4aff8c0d9e..ddc3128376 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -285,9 +286,14 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCrossCompactionMemoryBudget(2 * 1024 * 1024L * 1024L);
+ long originSize = SystemInfo.getInstance().getMemorySizeForCompaction();
+ SystemInfo.getInstance()
+ .setMemorySizeForCompaction(
+ 2
+ * 1024L
+ * 1024L
+ * 1024L
+ * IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread());
String sgName = COMPACTION_TEST_SG + "test2";
try {
IoTDB.metaManager.setStorageGroup(new PartialPath(sgName));
@@ -388,6 +394,7 @@ public class CompactionSchedulerTest {
IoTDBDescriptor.getInstance()
.getConfig()
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
+ SystemInfo.getInstance().setMemorySizeForCompaction(originSize);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index fa6c4386c2..32bb1e97a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -278,7 +278,8 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
tsFileManager,
seqResources,
unseqResources,
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.isCompactionCandidate());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index 852b66aa62..eadfd6b71e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -438,7 +438,8 @@ public class CrossSpaceCompactionTest {
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
mergeResource.getSeqFiles(),
- mergeResource.getUnseqFiles());
+ mergeResource.getUnseqFiles(),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources) {
@@ -741,7 +742,8 @@ public class CrossSpaceCompactionTest {
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
mergeResource.getSeqFiles(),
- mergeResource.getUnseqFiles());
+ mergeResource.getUnseqFiles(),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
@@ -1043,7 +1045,8 @@ public class CrossSpaceCompactionTest {
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
mergeResource.getSeqFiles(),
- mergeResource.getUnseqFiles());
+ mergeResource.getUnseqFiles(),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
index 83ac4b538b..95fcd6d0b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionValidationTest.java
@@ -97,7 +97,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -134,7 +141,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -171,7 +185,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -216,7 +237,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(3), unseqResources.get(3));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -258,7 +286,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -298,7 +333,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(2), unseqResources.get(2));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -341,7 +383,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(3), unseqResources.get(3));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -383,7 +432,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(3), unseqResources.get(3));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -424,7 +480,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -466,7 +529,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -508,7 +578,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -550,7 +627,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -593,7 +677,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -637,7 +728,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -681,7 +779,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -726,7 +831,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -772,7 +884,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -818,7 +937,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -864,7 +990,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -910,7 +1043,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -955,7 +1095,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -996,7 +1143,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1038,7 +1192,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1080,7 +1241,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1122,7 +1290,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1165,7 +1340,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1209,7 +1391,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1253,7 +1442,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1298,7 +1494,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1344,7 +1547,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1390,7 +1600,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1436,7 +1653,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1482,7 +1706,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1527,7 +1758,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1570,7 +1808,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1614,7 +1859,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[0].get(1), seqResources.get(3));
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1658,7 +1910,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1702,7 +1961,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
Assert.assertEquals(result[1].get(1), unseqResources.get(1));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
@@ -1747,7 +2013,14 @@ public class CrossSpaceCompactionValidationTest extends AbstractCompactionTest {
Assert.assertEquals(result[1].get(0), unseqResources.get(0));
new RewriteCrossSpaceCompactionTask(
- "0", COMPACTION_TEST_SG, 0, tsFileManager, result[0], result[1], new AtomicInteger(0))
+ "0",
+ COMPACTION_TEST_SG,
+ 0,
+ tsFileManager,
+ result[0],
+ result[1],
+ new AtomicInteger(0),
+ 0)
.call();
validateSeqFiles();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index 040134b4d4..9aa3d9b76f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -227,7 +227,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
tsFileManager,
seqResources,
unseqResources,
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
rewriteCrossSpaceCompactionTask.call();
for (TsFileResource resource : seqResources) {
@@ -464,7 +465,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
tsFileManager,
seqResources,
unseqResources,
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
rewriteCrossSpaceCompactionTask.call();
for (TsFileResource resource : seqResources) {
@@ -611,7 +613,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
rewriteCrossSpaceCompactionTask.setSourceFilesToCompactionCandidate();
rewriteCrossSpaceCompactionTask.checkValidAndSetMerging();
// delete data in source file during compaction
@@ -731,7 +734,8 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
rewriteCrossSpaceCompactionTask.setSourceFilesToCompactionCandidate();
rewriteCrossSpaceCompactionTask.checkValidAndSetMerging();
// delete data in source file during compaction
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java
deleted file mode 100644
index 1fb4b2e3fc..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/task/FakedCrossSpaceCompactionTaskFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.compaction.task;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-
-import java.util.List;
-
-public class FakedCrossSpaceCompactionTaskFactory {
- public AbstractCompactionTask createTask(
- String logicalStorageGroupName,
- String virtualStorageGroupName,
- long timePartitionId,
- TsFileManager tsFileManager,
- List<TsFileResource> selectedSeqTsFileResourceList,
- List<TsFileResource> selectedUnSeqTsFileResourceList) {
- return IoTDBDescriptor.getInstance()
- .getConfig()
- .getCrossCompactionStrategy()
- .getCompactionTask(
- logicalStorageGroupName,
- virtualStorageGroupName,
- timePartitionId,
- tsFileManager,
- selectedSeqTsFileResourceList,
- selectedUnSeqTsFileResourceList);
- }
-}