You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/02/12 05:09:59 UTC
[iotdb] branch master updated: [IOTDB-2500] [ query & cross-compaction ] cross-compaction stuck (#5031)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9ae99 [IOTDB-2500] [ query & cross-compaction ] cross-compaction stuck (#5031)
6d9ae99 is described below
commit 6d9ae99d1a1db524c9b1f44ae7e2c9610576fe4b
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Sat Feb 12 13:09:20 2022 +0800
[IOTDB-2500] [ query & cross-compaction ] cross-compaction stuck (#5031)
---
.../db/engine/compaction/CompactionUtils.java | 43 +++----
.../CrossSpaceCompactionExceptionHandler.java | 134 +++++++++------------
.../RewriteCrossSpaceCompactionLogAnalyzer.java | 8 +-
.../selector/RewriteCompactionFileSelector.java | 8 +-
.../task/RewriteCrossCompactionRecoverTask.java | 29 +++--
.../task/RewriteCrossSpaceCompactionTask.java | 59 ++++-----
.../InnerSpaceCompactionExceptionHandler.java | 55 ++++-----
.../SizeTieredCompactionRecoverTask.java | 7 +-
.../inner/sizetiered/SizeTieredCompactionTask.java | 32 ++---
.../db/engine/storagegroup/TsFileManager.java | 40 ++++++
.../iotdb/db/rescon/TsFileResourceManager.java | 22 ++--
.../cross/CrossSpaceCompactionExceptionTest.java | 50 ++++++--
.../cross/RewriteCompactionFileSelectorTest.java | 2 +
.../RewriteCrossSpaceCompactionRecoverTest.java | 4 +-
.../inner/InnerSpaceCompactionExceptionTest.java | 16 +--
.../SizeTieredCompactionRecoverTest.java | 2 +-
.../storagegroup/StorageGroupProcessorTest.java | 4 +-
17 files changed, 283 insertions(+), 232 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 47726e4..4102f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -48,14 +48,12 @@ import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -162,6 +160,7 @@ public class CompactionUtils {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
+ Set<String> allMeasurementSet = new HashSet<>(allMeasurements);
for (String measurement : allMeasurements) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
measurementSchemas.add(
@@ -172,7 +171,7 @@ public class CompactionUtils {
device,
Collections.singletonList(measurement),
measurementSchemas,
- new HashSet<>(allMeasurements),
+ allMeasurementSet,
queryContext,
queryDataSource,
false);
@@ -359,30 +358,20 @@ public class CompactionUtils {
targetFile.getModFile().close();
}
- /**
- * This method is called to recover modifications while an exception occurs during compaction. It
- * appends new modifications of each selected tsfile to its corresponding old mods file and delete
- * the compaction mods file.
- *
- * @param selectedTsFileResources
- * @throws IOException
- */
- public static void appendNewModificationsToOldModsFile(
- List<TsFileResource> selectedTsFileResources) throws IOException {
- for (TsFileResource sourceFile : selectedTsFileResources) {
- // if there are modifications to this seqFile during compaction
- if (sourceFile.getCompactionModFile().exists()) {
- ModificationFile compactionModificationFile =
- ModificationFile.getCompactionMods(sourceFile);
- Collection<Modification> newModification = compactionModificationFile.getModifications();
- compactionModificationFile.close();
- // write the new modifications to its old modification file
- try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
- for (Modification modification : newModification) {
- oldModificationFile.write(modification);
- }
- }
- FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath()));
+ public static void deleteCompactionModsFile(
+ List<TsFileResource> selectedSeqTsFileResourceList,
+ List<TsFileResource> selectedUnSeqTsFileResourceList)
+ throws IOException {
+ for (TsFileResource seqFile : selectedSeqTsFileResourceList) {
+ ModificationFile modificationFile = seqFile.getCompactionModFile();
+ if (modificationFile.exists()) {
+ modificationFile.remove();
+ }
+ }
+ for (TsFileResource unseqFile : selectedUnSeqTsFileResourceList) {
+ ModificationFile modificationFile = unseqFile.getCompactionModFile();
+ if (modificationFile.exists()) {
+ modificationFile.remove();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
index c941f23..6782e81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
@@ -23,13 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.recover.RewriteCrossSpaceCompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.recover.RewriteCrossSpaceCompactionLogger;
-import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -39,7 +36,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
public class CrossSpaceCompactionExceptionHandler {
@@ -52,7 +48,8 @@ public class CrossSpaceCompactionExceptionHandler {
List<TsFileResource> targetResourceList,
List<TsFileResource> seqResourceList,
List<TsFileResource> unseqResourceList,
- TsFileManager tsFileManager) {
+ TsFileManager tsFileManager,
+ long timePartition) {
try {
if (logFile == null || !logFile.exists()) {
// the log file is null or the log file does not exists
@@ -75,23 +72,19 @@ public class CrossSpaceCompactionExceptionHandler {
boolean allUnseqFilesExist = checkAllSourceFileExists(unseqResourceList, lostUnseqFiles);
if (allSeqFilesExist && allUnseqFilesExist) {
- // all source files exists, remove target file
+ // all source files exists, remove target file and recover memory
handleSuccess =
handleWhenAllSourceFilesExist(
storageGroup,
targetResourceList,
seqResourceList,
unseqResourceList,
- tsFileManager);
+ tsFileManager,
+ timePartition);
} else {
handleSuccess =
handleWhenSomeSourceFilesLost(
- storageGroup,
- targetResourceList,
- seqResourceList,
- unseqResourceList,
- logFile,
- tsFileManager);
+ storageGroup, seqResourceList, unseqResourceList, logFile);
}
if (!handleSuccess) {
@@ -125,71 +118,78 @@ public class CrossSpaceCompactionExceptionHandler {
}
/**
- * All source files exists, convert compaction modification to normal modification and delete
- * target files. To avoid triggering OOM again under OOM errors, we do not check whether the
- * target files are complete.
+ * When all source files exists: (1) delete compaction mods files (2) delete target files, tmp
+ * target files and its corresponding files (3) recover memory. To avoid triggering OOM again
+ * under OOM errors, we do not check whether the target files are complete.
*/
private static boolean handleWhenAllSourceFilesExist(
String storageGroup,
List<TsFileResource> targetTsFiles,
List<TsFileResource> seqFileList,
List<TsFileResource> unseqFileList,
- TsFileManager tsFileManager)
+ TsFileManager tsFileManager,
+ long timePartition)
throws IOException {
- for (TsFileResource seqFile : seqFileList) {
- ModificationFile compactionModFile = ModificationFile.getCompactionMods(seqFile);
- if (compactionModFile.exists()) {
- Collection<Modification> modifications = compactionModFile.getModifications();
- ModificationFile normalModification = ModificationFile.getNormalMods(seqFile);
- for (Modification modification : modifications) {
- normalModification.write(modification);
+ TsFileResourceList unseqTsFileResourceList =
+ tsFileManager.getUnsequenceListByTimePartition(timePartition);
+ TsFileResourceList seqTsFileResourceList =
+ tsFileManager.getSequenceListByTimePartition(timePartition);
+
+ // delete compaction mods files
+ CompactionUtils.deleteCompactionModsFile(seqFileList, unseqFileList);
+
+ boolean removeAllTargetFile = true;
+ tsFileManager.writeLock("CrossSpaceCompactionExceptionHandler");
+ try {
+ for (TsFileResource targetTsFile : targetTsFiles) {
+ // delete target files
+ targetTsFile.writeLock();
+ if (!targetTsFile.remove()) {
+ LOGGER.error(
+ "{} [Compaction][Exception] failed to delete target tsfile {} when handling exception",
+ storageGroup,
+ targetTsFile);
+ removeAllTargetFile = false;
}
- normalModification.close();
- compactionModFile.close();
- FileUtils.delete(new File(compactionModFile.getFilePath()));
- }
- }
+ targetTsFile.writeUnlock();
- for (TsFileResource unseqFile : unseqFileList) {
- ModificationFile compactionModFile = ModificationFile.getCompactionMods(unseqFile);
- if (compactionModFile.exists()) {
- Collection<Modification> modifications = compactionModFile.getModifications();
- ModificationFile normalModification = ModificationFile.getNormalMods(unseqFile);
- for (Modification modification : modifications) {
- normalModification.write(modification);
+ // remove target tsfile resource in memory
+ if (targetTsFile.isFileInList()) {
+ seqTsFileResourceList.remove(targetTsFile);
+ TsFileResourceManager.getInstance().removeTsFileResource(targetTsFile);
}
- normalModification.close();
- compactionModFile.close();
- FileUtils.delete(new File(compactionModFile.getFilePath()));
}
- }
- boolean removeAllTargetFile = true;
- for (TsFileResource targetTsFile : targetTsFiles) {
- if (!targetTsFile.remove()) {
- LOGGER.error(
- "{} [Compaction][Exception] failed to delete target tsfile {} when handling exception",
- storageGroup,
- targetTsFile);
- removeAllTargetFile = false;
+ // recover source tsfile resource in memory
+ for (TsFileResource tsFileResource : seqFileList) {
+ if (!tsFileResource.isFileInList()) {
+ seqTsFileResourceList.keepOrderInsert(tsFileResource);
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(tsFileResource);
+ }
}
+ for (TsFileResource tsFileResource : unseqFileList) {
+ if (!tsFileResource.isFileInList()) {
+ unseqTsFileResourceList.keepOrderInsert(tsFileResource);
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(tsFileResource);
+ }
+ }
+ } finally {
+ tsFileManager.writeUnlock();
}
return removeAllTargetFile;
}
/**
* Some source files are lost, check if the compaction has finished. If the compaction has
- * finished, try to rename the target files and delete source files. If the compaction has not
- * finished, set the allowCompaction in tsFileManager to false and print some error logs.
+ * finished, delete the remaining source files and compaction mods files. If the compaction has
+ * not finished, set the allowCompaction in tsFileManager to false and print some error logs.
*/
- private static boolean handleWhenSomeSourceFilesLost(
+ public static boolean handleWhenSomeSourceFilesLost(
String storageGroup,
- List<TsFileResource> targetResources,
List<TsFileResource> seqFileList,
List<TsFileResource> unseqFileList,
- File logFile,
- TsFileManager tsFileManager)
- throws IOException, WriteProcessException {
+ File logFile)
+ throws IOException {
long magicStringLength =
RewriteCrossSpaceCompactionLogger.MAGIC_STRING.getBytes(StandardCharsets.UTF_8).length;
long fileLength = logFile.length();
@@ -217,7 +217,7 @@ public class CrossSpaceCompactionExceptionHandler {
}
// read tail string in compaction log
- if (!logAnalyzer.isAllTargetFilesExisted()) {
+ if (!logAnalyzer.isEndMagicStringExisted()) {
LOGGER.error(
"{} [Compaction][ExceptionHandler] the tail magic string in compaction log is incorrect,"
+ " failed to handle exception",
@@ -225,34 +225,18 @@ public class CrossSpaceCompactionExceptionHandler {
return false;
}
- // compaction finish, rename the target file
- CompactionUtils.moveTargetFile(targetResources, false, storageGroup);
-
- // delete the source files
- TsFileResourceList unseqTsFileResourceList =
- tsFileManager.getUnsequenceListByTimePartition(unseqFileList.get(0).getTimePartition());
- TsFileResourceList seqTsFileResourceList =
- tsFileManager.getSequenceListByTimePartition(seqFileList.get(0).getTimePartition());
+ // delete source files
for (TsFileResource unseqFile : unseqFileList) {
unseqFile.remove();
unseqFile.setDeleted(true);
- unseqTsFileResourceList.remove(unseqFile);
}
-
for (TsFileResource seqFile : seqFileList) {
seqFile.remove();
seqFile.setDeleted(true);
- seqTsFileResourceList.remove(seqFile);
- }
-
- for (TsFileResource targetFile : targetResources) {
- targetFile.setClosed(true);
- seqTsFileResourceList.keepOrderInsert(targetFile);
}
// delete compaction mods files
- InnerSpaceCompactionUtils.deleteModificationForSourceFile(seqFileList, storageGroup);
- InnerSpaceCompactionUtils.deleteModificationForSourceFile(unseqFileList, storageGroup);
+ CompactionUtils.deleteCompactionModsFile(seqFileList, unseqFileList);
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/recover/RewriteCrossSpaceCompactionLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/recover/RewriteCrossSpaceCompactionLogAnalyzer.java
index fa35014..60d4332 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/recover/RewriteCrossSpaceCompactionLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/recover/RewriteCrossSpaceCompactionLogAnalyzer.java
@@ -42,7 +42,7 @@ public class RewriteCrossSpaceCompactionLogAnalyzer {
private boolean isSeq = false;
private boolean isFirstMagicStringExisted = false;
- boolean isAllTargetFilesExisted = false;
+ boolean isEndMagicStringExisted = false;
public RewriteCrossSpaceCompactionLogAnalyzer(File logFile) {
this.logFile = logFile;
@@ -61,7 +61,7 @@ public class RewriteCrossSpaceCompactionLogAnalyzer {
if (magicCount == 0) {
isFirstMagicStringExisted = true;
} else {
- isAllTargetFilesExisted = true;
+ isEndMagicStringExisted = true;
}
magicCount++;
break;
@@ -100,8 +100,8 @@ public class RewriteCrossSpaceCompactionLogAnalyzer {
return targetFileInfos;
}
- public boolean isAllTargetFilesExisted() {
- return isAllTargetFilesExisted;
+ public boolean isEndMagicStringExisted() {
+ return isEndMagicStringExisted;
}
public boolean isFirstMagicStringExisted() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
index feb1297..4d33c05 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/selector/RewriteCompactionFileSelector.java
@@ -259,8 +259,12 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
long seqEndTime = seqFile.getEndTime(deviceId);
long seqStartTime = seqFile.getStartTime(deviceId);
if (unseqEndTime < seqStartTime) {
- // if time range in unseq file is 10-20, seq file is 30-40, then skip this seq file and
- // there is no more overlap later.
+ // Suppose the time range in unseq file is 10-20, seq file is 30-40. If this unseq file
+ // has no overlapped seq files, then select this seq file. Otherwise, skip this seq file.
+ // There is no more overlap later.
+ if (tmpSelectedSeqFiles.size() == 0) {
+ tmpSelectedSeqFiles.add(i);
+ }
noMoreOverlap = true;
} else if (!seqFile.isClosed()) {
// we cannot make sure whether unclosed file has overlap or not, so we just add it.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java
index d85a141..775c9b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossCompactionRecoverTask.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -102,7 +103,7 @@ public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompacti
if (isAllSourcesFileExisted) {
handleSuccess =
handleWithAllSourceFilesExist(
- logAnalyzer, targetFileIdentifiers, sourceFileIdentifiers, fullStorageGroupName);
+ targetFileIdentifiers, sourceFileIdentifiers, fullStorageGroupName);
} else {
handleSuccess = handleWithoutAllSourceFilesExist(sourceFileIdentifiers);
}
@@ -136,19 +137,18 @@ public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompacti
}
}
+ /**
+ * All source files exist: (1) delete all the target files and tmp target files (2) delete
+ * compaction mods files.
+ */
private boolean handleWithAllSourceFilesExist(
- RewriteCrossSpaceCompactionLogAnalyzer analyzer,
List<TsFileIdentifier> targetFileIdentifiers,
List<TsFileIdentifier> sourceFileIdentifiers,
String fullStorageGroupName) {
- // all source files exist, delete all the target files and tmp target files
LOGGER.info(
"{} [Compaction][Recover] all source files exists, delete all target files.",
fullStorageGroupName);
- List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
- for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- sourceTsFileResourceList.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
- }
+
for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
// xxx.merge
File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
@@ -177,12 +177,17 @@ public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompacti
return false;
}
}
- // deal with compaction modification
+
+ // delete compaction mods files
+ List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+ for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+ sourceTsFileResourceList.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+ }
try {
- CompactionUtils.appendNewModificationsToOldModsFile(sourceTsFileResourceList);
+ CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, Collections.emptyList());
} catch (Throwable e) {
LOGGER.error(
- "{} Exception occurs while handling exception, set allowCompaction to false",
+ "{} [Compaction][Recover] Exception occurs while deleting compaction mods file, set allowCompaction to false",
fullStorageGroupName,
e);
return false;
@@ -190,6 +195,10 @@ public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompacti
return true;
}
+ /**
+ * Some source files lost: delete remaining source files, encluding: tsfile, resource file, mods
+ * file and compaction mods file.
+ */
private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFileIdentifiers) {
// some source files have been deleted, while target file must exist.
boolean handleSuccess = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 93efeef..7a00e3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -24,14 +24,13 @@ import org.apache.iotdb.db.engine.compaction.cross.AbstractCrossSpaceCompactionT
import org.apache.iotdb.db.engine.compaction.cross.CrossSpaceCompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.recover.RewriteCrossSpaceCompactionLogger;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.commons.io.FileUtils;
@@ -51,11 +50,12 @@ import static org.apache.iotdb.db.engine.compaction.cross.rewrite.recover.Rewrit
import static org.apache.iotdb.db.engine.compaction.cross.rewrite.recover.RewriteCrossSpaceCompactionLogger.STR_UNSEQ_FILES;
public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactionTask {
-
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
protected List<TsFileResource> selectedSeqTsFileResourceList;
protected List<TsFileResource> selectedUnSeqTsFileResourceList;
+ protected TsFileResourceList seqTsFileResourceList;
+ protected TsFileResourceList unseqTsFileResourceList;
protected TsFileManager tsFileManager;
private File logFile;
@@ -79,6 +79,8 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
selectedUnSeqTsFileResourceList);
this.selectedSeqTsFileResourceList = selectedSeqTsFileResourceList;
this.selectedUnSeqTsFileResourceList = selectedUnSeqTsFileResourceList;
+ this.seqTsFileResourceList = tsFileManager.getSequenceListByTimePartition(timePartition);
+ this.unseqTsFileResourceList = tsFileManager.getUnsequenceListByTimePartition(timePartition);
this.tsFileManager = tsFileManager;
}
@@ -95,7 +97,8 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
targetTsfileResourceList,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
- tsFileManager);
+ tsFileManager,
+ timePartition);
throw throwable;
} finally {
releaseAllLock();
@@ -150,17 +153,25 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
compactionLogger.logStringInfo(MAGIC_STRING);
compactionLogger.close();
- releaseReadAndLockWrite(selectedSeqTsFileResourceList);
- releaseReadAndLockWrite(selectedUnSeqTsFileResourceList);
-
CompactionUtils.combineModsInCompaction(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);
+ // update tsfile resource in memory
+ tsFileManager.replace(
+ selectedSeqTsFileResourceList,
+ selectedUnSeqTsFileResourceList,
+ targetTsfileResourceList,
+ timePartition,
+ true);
+
+ releaseReadAndLockWrite(selectedSeqTsFileResourceList);
+ releaseReadAndLockWrite(selectedUnSeqTsFileResourceList);
+
deleteOldFiles(selectedSeqTsFileResourceList);
deleteOldFiles(selectedUnSeqTsFileResourceList);
- removeCompactionModification();
+ CompactionUtils.deleteCompactionModsFile(
+ selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList);
- updateTsFileResource();
if (logFile.exists()) {
FileUtils.delete(logFile);
}
@@ -171,21 +182,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
}
}
- private void updateTsFileResource() throws IOException {
- for (TsFileResource resource : selectedSeqTsFileResourceList) {
- TsFileResourceManager.getInstance().removeTsFileResource(resource);
- tsFileManager.remove(resource, true);
- }
- for (TsFileResource resource : selectedUnSeqTsFileResourceList) {
- TsFileResourceManager.getInstance().removeTsFileResource(resource);
- tsFileManager.remove(resource, false);
- }
- for (TsFileResource resource : targetTsfileResourceList) {
- tsFileManager.getSequenceListByTimePartition(timePartition).keepOrderInsert(resource);
- TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
- }
- }
-
private boolean addReadLock(List<TsFileResource> tsFileResourceList) {
for (TsFileResource tsFileResource : tsFileResourceList) {
tsFileResource.readLock();
@@ -230,7 +226,7 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
holdWriteLockList.clear();
}
- void deleteOldFiles(List<TsFileResource> tsFileResourceList) throws IOException {
+ private void deleteOldFiles(List<TsFileResource> tsFileResourceList) throws IOException {
for (TsFileResource tsFileResource : tsFileResourceList) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
tsFileResource.setDeleted(true);
@@ -245,19 +241,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
return fullStorageGroupName;
}
- private void removeCompactionModification() {
- try {
- for (TsFileResource seqFile : selectedSeqTsFileResourceList) {
- ModificationFile.getCompactionMods(seqFile).remove();
- }
- for (TsFileResource unseqFile : selectedUnSeqTsFileResourceList) {
- ModificationFile.getCompactionMods(unseqFile).remove();
- }
- } catch (IOException e) {
- logger.error("{} cannot remove merging modification ", fullStorageGroupName, e);
- }
- }
-
@Override
public boolean equalsOtherTask(AbstractCompactionTask other) {
if (other instanceof RewriteCrossSpaceCompactionTask) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
index fa5b380..5478e21 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
@@ -20,10 +20,11 @@ package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -75,6 +77,7 @@ public class InnerSpaceCompactionExceptionHandler {
targetTsFile,
selectedTsFileResourceList,
tsFileResourceList,
+ tsFileManager,
false);
} else {
// some source file does not exists
@@ -95,11 +98,7 @@ public class InnerSpaceCompactionExceptionHandler {
} else {
handleSuccess =
handleWhenSomeSourceFilesLost(
- fullStorageGroupName,
- targetTsFile,
- selectedTsFileResourceList,
- tsFileResourceList,
- lostSourceFiles);
+ fullStorageGroupName, targetTsFile, selectedTsFileResourceList, lostSourceFiles);
}
}
@@ -143,6 +142,7 @@ public class InnerSpaceCompactionExceptionHandler {
TsFileResource targetTsFile,
List<TsFileResource> selectedTsFileResourceList,
TsFileResourceList tsFileResourceList,
+ TsFileManager tsFileManager,
boolean isRecover) {
try {
// all source file exists, delete the target file
@@ -182,34 +182,41 @@ public class InnerSpaceCompactionExceptionHandler {
return false;
}
if (!isRecover) {
- tsFileResourceList.writeLock();
+ tsFileManager.writeLock("InnerSpaceCompactionExceptionHandler");
try {
if (targetTsFile.isFileInList()) {
// target tsfile is in the list, remove it
tsFileResourceList.remove(targetTsFile);
+ TsFileResourceManager.getInstance().removeTsFileResource(targetTsFile);
}
for (TsFileResource tsFileResource : selectedTsFileResourceList) {
// if the source file is not in tsfileResourceList
// insert it into the list
if (!tsFileResource.isFileInList()) {
tsFileResourceList.keepOrderInsert(tsFileResource);
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(tsFileResource);
}
}
} finally {
- tsFileResourceList.writeUnlock();
+ tsFileManager.writeUnlock();
}
}
- if (!targetTsFile.remove()) {
- // failed to remove target tsfile
- // system should not carry out the subsequent compaction in case of data redundant
- LOGGER.error(
- "{} [Compaction][ExceptionHandler] failed to remove target file {}",
- fullStorageGroupName,
- targetTsFile);
- return false;
+ targetTsFile.writeLock();
+ try {
+ if (!targetTsFile.remove()) {
+ // failed to remove target tsfile
+ // system should not carry out the subsequent compaction in case of data redundant
+ LOGGER.error(
+ "{} [Compaction][ExceptionHandler] failed to remove target file {}",
+ fullStorageGroupName,
+ targetTsFile);
+ return false;
+ }
+ } finally {
+ targetTsFile.writeUnlock();
}
- // deal with compaction modification
- InnerSpaceCompactionUtils.appendNewModificationsToOldModsFile(selectedTsFileResourceList);
+ // delete compaction mods files
+ CompactionUtils.deleteCompactionModsFile(selectedTsFileResourceList, Collections.emptyList());
} catch (Throwable e) {
LOGGER.error(
"{} Exception occurs while handling exception, set allowCompaction to false",
@@ -224,7 +231,6 @@ public class InnerSpaceCompactionExceptionHandler {
String fullStorageGroupName,
TsFileResource targetTsFile,
List<TsFileResource> selectedTsFileResourceList,
- TsFileResourceList tsFileResourceList,
List<TsFileResource> lostSourceFiles) {
boolean handleSuccess = true;
try {
@@ -241,17 +247,12 @@ public class InnerSpaceCompactionExceptionHandler {
fullStorageGroupName,
sourceFile);
handleSuccess = false;
- } else {
- tsFileResourceList.remove(sourceFile);
}
}
+ // delete compaction mods files
+ CompactionUtils.deleteCompactionModsFile(
+ selectedTsFileResourceList, Collections.emptyList());
- InnerSpaceCompactionUtils.deleteModificationForSourceFile(
- selectedTsFileResourceList, fullStorageGroupName);
-
- if (!tsFileResourceList.contains(targetTsFile)) {
- tsFileResourceList.keepOrderInsert(targetTsFile);
- }
} else {
// target file is not complete, and some source file is lost
// some data is lost
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 169f98f..78d709d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -146,7 +146,12 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
}
handleSuccess =
InnerSpaceCompactionExceptionHandler.handleWhenAllSourceFilesExist(
- fullStorageGroupName, targetResource, sourceResources, tsFileResourceList, true);
+ fullStorageGroupName,
+ targetResource,
+ sourceResources,
+ tsFileResourceList,
+ tsFileManager,
+ true);
} else {
handleSuccess = handleWithoutAllSourceFilesExist(sourceFileIdentifiers);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index c1f5130..aac00fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
-import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.commons.io.FileUtils;
@@ -145,24 +144,23 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
}
// replace the old files with new file, the new is in same position as the old
- for (TsFileResource resource : selectedTsFileResourceList) {
- TsFileResourceManager.getInstance().removeTsFileResource(resource);
- }
- TsFileResourceManager.getInstance().registerSealedTsFileResource(targetTsFileResource);
- tsFileResourceList.writeLock();
- try {
- for (TsFileResource resource : selectedTsFileResourceList) {
- tsFileResourceList.remove(resource);
- }
- tsFileResourceList.keepOrderInsert(targetTsFileResource);
- } finally {
- tsFileResourceList.writeUnlock();
+ if (sequence) {
+ tsFileManager.replace(
+ selectedTsFileResourceList,
+ Collections.emptyList(),
+ Collections.singletonList(targetTsFileResource),
+ timePartition,
+ true);
+ } else {
+ tsFileManager.replace(
+ Collections.emptyList(),
+ selectedTsFileResourceList,
+ Collections.singletonList(targetTsFileResource),
+ timePartition,
+ false);
}
LOGGER.info(
- "{} [Compaction] compaction finish, start to delete old files", fullStorageGroupName);
-
- LOGGER.info(
"{} [Compaction] Compacted target files, try to get the write lock of source files",
fullStorageGroupName);
@@ -183,6 +181,8 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
targetTsFileResource));
}
+ LOGGER.info(
+ "{} [Compaction] compaction finish, start to delete old files", fullStorageGroupName);
// delete the old files
InnerSpaceCompactionUtils.deleteTsFilesInDisk(
selectedTsFileResourceList, fullStorageGroupName);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 92b7204..e667792 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -171,6 +172,45 @@ public class TsFileManager {
}
}
+ /** This method is called after compaction to update memory. */
+ public void replace(
+ List<TsFileResource> seqFileResources,
+ List<TsFileResource> unseqFileResources,
+ List<TsFileResource> targetFileResources,
+ long timePartition,
+ boolean isTargetSequence)
+ throws IOException {
+ writeLock("replace");
+ try {
+ for (TsFileResource tsFileResource : seqFileResources) {
+ if (sequenceFiles.get(timePartition).remove(tsFileResource)) {
+ TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
+ }
+ }
+ for (TsFileResource tsFileResource : unseqFileResources) {
+ if (unsequenceFiles.get(timePartition).remove(tsFileResource)) {
+ TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource);
+ }
+ }
+ if (isTargetSequence) {
+ // seq inner space compaction or cross space compaction
+ for (TsFileResource resource : targetFileResources) {
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
+ sequenceFiles.get(timePartition).keepOrderInsert(resource);
+ }
+ } else {
+ // unseq inner space compaction
+ for (TsFileResource resource : targetFileResources) {
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
+ unsequenceFiles.get(timePartition).keepOrderInsert(resource);
+ }
+ }
+
+ } finally {
+ writeUnlock();
+ }
+ }
+
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
readLock();
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
index 4d9b411..a52b241 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/TsFileResourceManager.java
@@ -60,19 +60,23 @@ public class TsFileResourceManager {
* memory cost is larger than threshold, degradation is triggered.
*/
public synchronized void registerSealedTsFileResource(TsFileResource tsFileResource) {
- sealedTsFileResources.add(tsFileResource);
- totalTimeIndexMemCost += tsFileResource.calculateRamSize();
- chooseTsFileResourceToDegrade();
+ if (!sealedTsFileResources.contains(tsFileResource)) {
+ sealedTsFileResources.add(tsFileResource);
+ totalTimeIndexMemCost += tsFileResource.calculateRamSize();
+ chooseTsFileResourceToDegrade();
+ }
}
/** delete the TsFileResource in PriorityQueue when the source file is deleted */
public synchronized void removeTsFileResource(TsFileResource tsFileResource) {
- sealedTsFileResources.remove(tsFileResource);
- if (TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
- == TimeIndexLevel.FILE_TIME_INDEX) {
- totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
- } else {
- totalTimeIndexMemCost -= tsFileResource.getRamSize();
+ if (sealedTsFileResources.contains(tsFileResource)) {
+ sealedTsFileResources.remove(tsFileResource);
+ if (TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType())
+ == TimeIndexLevel.FILE_TIME_INDEX) {
+ totalTimeIndexMemCost -= tsFileResource.calculateRamSize();
+ } else {
+ totalTimeIndexMemCost -= tsFileResource.getRamSize();
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
index 652591d..20e96ec 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
@@ -102,7 +102,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager);
+ tsFileManager,
+ 0L);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
@@ -135,6 +136,9 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
+ TsFileResource.RESOURCE_SUFFIX)
.exists());
}
+ Assert.assertEquals(4, tsFileManager.getSequenceListByTimePartition(0).size());
+ Assert.assertEquals(5, tsFileManager.getUnsequenceListByTimePartition(0).size());
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
}
@Test
@@ -171,7 +175,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager);
+ tsFileManager,
+ 0L);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
@@ -204,6 +209,9 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
+ TsFileResource.RESOURCE_SUFFIX)
.exists());
}
+ Assert.assertEquals(4, tsFileManager.getSequenceListByTimePartition(0).size());
+ Assert.assertEquals(5, tsFileManager.getUnsequenceListByTimePartition(0).size());
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
}
@Test
@@ -232,6 +240,15 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
compactionLogger.logFiles(unseqResources, STR_UNSEQ_FILES);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ for (TsFileResource resource : seqResources) {
+ tsFileManager.getSequenceListByTimePartition(0).remove(resource);
+ }
+ for (TsFileResource resource : unseqResources) {
+ tsFileManager.getUnsequenceListByTimePartition(0).remove(resource);
+ }
+ for (TsFileResource resource : targetResources) {
+ tsFileManager.getSequenceListByTimePartition(0).keepOrderInsert(resource);
+ }
seqResources.get(0).getTsFile().delete();
compactionLogger.logStringInfo(MAGIC_STRING);
compactionLogger.close();
@@ -241,7 +258,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager);
+ tsFileManager,
+ 0L);
// all source file should not exist
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.getTsFile().exists());
@@ -270,6 +288,9 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
Assert.assertTrue(
new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
}
+ Assert.assertEquals(4, tsFileManager.getSequenceListByTimePartition(0).size());
+ Assert.assertEquals(0, tsFileManager.getUnsequenceListByTimePartition(0).size());
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
}
/**
@@ -321,6 +342,15 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(i), false);
}
CompactionUtils.combineModsInCompaction(seqResources, unseqResources, targetResources);
+ for (TsFileResource resource : seqResources) {
+ tsFileManager.getSequenceListByTimePartition(0).remove(resource);
+ }
+ for (TsFileResource resource : unseqResources) {
+ tsFileManager.getUnsequenceListByTimePartition(0).remove(resource);
+ }
+ for (TsFileResource resource : targetResources) {
+ tsFileManager.getSequenceListByTimePartition(0).keepOrderInsert(resource);
+ }
seqResources.get(0).remove();
CrossSpaceCompactionExceptionHandler.handleException(
@@ -329,7 +359,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager);
+ tsFileManager,
+ 0L);
// All source file should not exist. All compaction mods file and old mods file of each source
// file should not exist
for (TsFileResource resource : seqResources) {
@@ -372,6 +403,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
// compaction log file should not exist
Assert.assertFalse(compactionLogFile.exists());
+ Assert.assertEquals(4, tsFileManager.getSequenceListByTimePartition(0).size());
+ Assert.assertEquals(0, tsFileManager.getUnsequenceListByTimePartition(0).size());
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
@@ -431,7 +464,8 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager);
+ tsFileManager,
+ 0L);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
@@ -479,17 +513,19 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
for (TsFileResource resource : seqResources) {
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertEquals(1, resource.getModFile().getModifications().size());
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertEquals(1, resource.getModFile().getModifications().size());
}
// compaction log file should not exist
Assert.assertFalse(compactionLogFile.exists());
+ Assert.assertEquals(4, tsFileManager.getSequenceListByTimePartition(0).size());
+ Assert.assertEquals(5, tsFileManager.getUnsequenceListByTimePartition(0).size());
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index 8cec995..6f4badb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -147,6 +147,8 @@ public class RewriteCompactionFileSelectorTest extends MergeTest {
new RewriteCompactionFileSelector(resource, Long.MAX_VALUE);
List[] result = mergeFileSelector.select();
assertEquals(2, result.length);
+ assertEquals(5, result[0].size());
+ assertEquals(1, result[1].size());
resource.clear();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
index bdbbc0b..f8a799e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
@@ -485,12 +485,12 @@ public class RewriteCrossSpaceCompactionRecoverTest extends AbstractCompactionTe
for (TsFileResource resource : seqResources) {
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertEquals(1, resource.getModFile().getModifications().size());
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertEquals(1, resource.getModFile().getModifications().size());
}
// compaction log file should not exist
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
index 21692da..7d2bb87 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
@@ -31,8 +31,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.h2.store.fs.FileUtils;
import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
@@ -42,8 +40,6 @@ import java.util.HashMap;
import java.util.Map;
public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompactionTest {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(InnerSpaceCompactionExceptionTest.class);
/**
* Test when all source files exist, and target file is not complete. System should delete target
@@ -53,7 +49,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testWhenAllSourceExistsAndTargetNotComplete() throws Exception {
- LOGGER.error("Running testWhenAllSourceExistsAndTargetNotComplete");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -101,7 +96,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testWhenAllSourceExistsAndTargetComplete() throws Exception {
- LOGGER.error("Running testWhenAllSourceExistsAndTargetComplete");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -145,7 +139,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testWhenSomeSourceLostAndTargetComplete() throws Exception {
- LOGGER.error("Running testWhenSomeSourceLostAndTargetComplete");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -162,6 +155,10 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources);
InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
+ for (TsFileResource resource : seqResources) {
+ tsFileManager.getSequenceListByTimePartition(0).remove(resource);
+ }
+ tsFileManager.getSequenceListByTimePartition(0).keepOrderInsert(targetResource);
FileUtils.delete(seqResources.get(0).getTsFile().getPath());
seqResources.get(0).remove();
compactionLogger.close();
@@ -196,7 +193,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testWhenSomeSourceLostAndTargetNotComplete() throws Exception {
- LOGGER.error("Running testWhenSomeSourceLostAndTargetNotComplete");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -249,7 +245,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testHandleWithCompactionMods() throws Exception {
- LOGGER.error("Running testHandleWithCompactionMods");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -315,7 +310,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testHandleWithNormalMods() throws Exception {
- LOGGER.error("Running testHandleWithNormalMods");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -374,7 +368,6 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
*/
@Test
public void testHandleWithCompactionModsAndNormalMods() throws Exception {
- LOGGER.error("Running testHandleWithCompactionModsAndNormalMods");
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
TsFileResource targetResource =
@@ -404,6 +397,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
deviceIds[0] + "." + measurementSchemas[0].getMeasurementId(),
new Pair<>(i * ptNum + 10, i * ptNum + 15));
CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), true);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false);
}
compactionLogger.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index 99521e2..9482036 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -524,7 +524,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
for (int i = 0; i < seqResources.size(); i++) {
seqResources.get(i).resetModFile();
Assert.assertTrue(seqResources.get(i).getModFile().exists());
- Assert.assertEquals(2, seqResources.get(i).getModFile().getModifications().size());
+ Assert.assertEquals(1, seqResources.get(i).getModFile().getModifications().size());
}
// mods file of the target file should not exist
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index f97f454..f18d81c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -638,7 +638,7 @@ public class StorageGroupProcessorTest {
TriggerExecutionException {
int originCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
- IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(10);
+ IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(9);
boolean originEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
boolean originEnableUnseqSpaceCompaction =
@@ -687,7 +687,7 @@ public class StorageGroupProcessorTest {
context,
null,
null);
- Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ Assert.assertEquals(2, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
}