You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/09 09:25:46 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-2128] [To rel/0.12] Simplify compaction recover (#4546)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 7e61462 [IOTDB-2128] [To rel/0.12] Simplify compaction recover (#4546)
7e61462 is described below
commit 7e6146255ff6d1c7330eaf3404f20bdc2987c0eb
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Thu Dec 9 17:25:11 2021 +0800
[IOTDB-2128] [To rel/0.12] Simplify compaction recover (#4546)
---
.../db/engine/compaction/TsFileManagement.java | 21 +--
.../level/LevelCompactionTsFileManagement.java | 166 ++++++---------------
.../no/NoCompactionTsFileManagement.java | 7 +-
.../engine/compaction/utils/CompactionUtils.java | 2 +-
.../merge/task/CompactionMergeRecoverTask.java | 13 +-
.../engine/storagegroup/StorageGroupProcessor.java | 19 +--
.../compaction/LevelCompactionRecoverTest.java | 145 ------------------
7 files changed, 65 insertions(+), 308 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 9229822..a2f18b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -121,9 +121,6 @@ public abstract class TsFileManagement {
/** add one TsFile to list */
public abstract void add(TsFileResource tsFileResource, boolean sequence) throws IOException;
- /** add one TsFile to list for recover */
- public abstract void addRecover(TsFileResource tsFileResource, boolean sequence);
-
/** add some TsFiles to list */
public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean sequence)
throws IOException;
@@ -141,7 +138,7 @@ public abstract class TsFileManagement {
public abstract int size(boolean sequence);
/** recover TsFile list */
- public abstract boolean recover();
+ public abstract void recover();
/** fork current TsFile list (call this before merge) */
public abstract void forkCurrentFileList(long timePartition) throws IOException;
@@ -190,25 +187,13 @@ public abstract class TsFileManagement {
public class CompactionRecoverTask extends StorageGroupCompactionTask {
- private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
-
- public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ public CompactionRecoverTask() {
super(storageGroupName);
- this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
}
@Override
public Void call() {
- boolean recoverSuccess = recover();
- if (recoverSuccess) {
- // in recover logic, the param time partition is useless, we can just pass 0L
- closeCompactionMergeCallBack.call(false, 0L);
- } else {
- logger.warn(
- "{}-{} [Compaction] failed to recover compaction, this storage group will not compact periodically",
- storageGroupName,
- virtualStorageGroupId);
- }
+ recover();
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 33a433f..aabb674 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.h2.store.fs.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -85,8 +83,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private final Map<Long, List<List<TsFileResource>>> unSequenceTsFileResources = new TreeMap<>();
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
- private final List<TsFileResource> sequenceRecoverTsFileResources = new ArrayList<>();
- private final List<TsFileResource> unSequenceRecoverTsFileResources = new ArrayList<>();
public LevelCompactionTsFileManagement(
String storageGroupName, String virtualStorageGroupId, String storageGroupDir) {
@@ -305,15 +301,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
@Override
- public void addRecover(TsFileResource tsFileResource, boolean sequence) {
- if (sequence) {
- sequenceRecoverTsFileResources.add(tsFileResource);
- } else {
- unSequenceRecoverTsFileResources.add(tsFileResource);
- }
- }
-
- @Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) throws IOException {
writeLock();
try {
@@ -422,7 +409,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
/** recover files */
@Override
@SuppressWarnings("squid:S3776")
- public boolean recover() {
+ public void recover() {
File logFile =
FSFactoryProducer.getFSFactory()
.getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
@@ -430,115 +417,74 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (logFile.exists()) {
CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
logAnalyzer.analyze();
- Set<String> deviceSet = logAnalyzer.getDeviceSet();
List<CompactionFileInfo> sourceFileInfo = logAnalyzer.getSourceFileInfo();
CompactionFileInfo targetFileInfo = logAnalyzer.getTargetFileInfo();
String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
File targetFile = null;
boolean isSeq = logAnalyzer.isSeq();
if (targetFileInfo == null || sourceFileInfo.isEmpty()) {
- return true;
+ return;
}
- if (deviceSet.isEmpty() && targetFileInfo != null) {
- // if not in compaction, just delete the target file
- for (String dataDir : dataDirs) {
- targetFile = targetFileInfo.getFile(dataDir);
- if (targetFile.exists()) {
- logger.info(
- "[Compaction][Recover] Target file {} found, device set is null, delete it",
- targetFile);
- FileUtils.delete(targetFile.getPath());
- return true;
- }
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+ TsFileResource targetResource = null;
+ for (String dataDir : dataDirs) {
+ if ((targetFile = targetFileInfo.getFile(dataDir)).exists()) {
+ targetResource = new TsFileResource(targetFile);
}
}
- // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource targetResource =
- getResourceFromDataDirs(true, dataDirs, isSeq, targetFileInfo);
if (targetResource != null) {
- // target tsfile is not compeleted
- logger.info(
- "[Compaction][Recover] target file {} is not compeleted, remove it", targetResource);
- targetResource.remove();
- if (isSeq) {
- sequenceRecoverTsFileResources.clear();
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile);
+ if (writer.hasCrashed()) {
+ // target tsfile is not compeleted
+ writer.close();
+ logger.info(
+ "[Compaction][Recover] target file {} is not complete, remove it", targetResource);
+ targetResource.remove();
} else {
- unSequenceRecoverTsFileResources.clear();
- }
- } else if ((targetResource =
- getResourceFromDataDirs(false, dataDirs, isSeq, targetFileInfo))
- != null) {
- // complete compaction, delete source files
- logger.info(
- "[Compaction][Recover] target file {} is compeleted, remove resource file",
- targetResource);
- long timePartition = targetResource.getTimePartition();
- List<TsFileResource> sourceTsFileResources = new ArrayList<>();
- for (CompactionFileInfo sourceInfo : sourceFileInfo) {
- // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource sourceTsFileResource =
- getResourceFromDataDirs(false, dataDirs, isSeq, sourceInfo);
- if (sourceTsFileResource == null) {
- // if sourceTsFileResource is null, it has been deleted
- continue;
- }
- sourceTsFileResources.add(sourceTsFileResource);
- }
- if (sourceFileInfo.size() != 0) {
- List<Modification> modifications = new ArrayList<>();
- // if not complete compaction, remove target file
- writeLock();
- try {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", storageGroupName));
+ writer.close();
+ // complete compaction, delete source files
+ logger.info(
+ "[Compaction][Recover] target file {} is compeleted, remove resource file",
+ targetResource);
+ List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+ for (CompactionFileInfo sourceInfo : sourceFileInfo) {
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+ File sourceFile = null;
+ TsFileResource sourceTsFileResource = null;
+ for (String dataDir : dataDirs) {
+ if ((sourceFile = sourceInfo.getFile(dataDir)).exists()) {
+ sourceTsFileResource = new TsFileResource(sourceFile);
+ break;
+ }
}
- int level = TsFileResource.getMergeLevel(sourceFileInfo.get(0).getFilename());
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- } finally {
- writeUnlock();
+ if (sourceTsFileResource == null) {
+ // if sourceTsFileResource is null, it has been deleted
+ continue;
+ }
+ sourceTsFileResources.add(sourceTsFileResource);
}
- for (TsFileResource tsFileResource : sourceTsFileResources) {
- logger.info(
- "{} recover storage group delete source file {}",
- storageGroupName,
- tsFileResource.getTsFile().getName());
+ if (sourceFileInfo.size() != 0) {
+ List<Modification> modifications = new ArrayList<>();
+ // if not complete compaction, remove target file
+ for (TsFileResource tsFileResource : sourceTsFileResources) {
+ logger.info(
+ "{} recover storage group delete source file {}",
+ storageGroupName,
+ tsFileResource.getTsFile().getName());
+ }
+ deleteLevelFilesInDisk(sourceTsFileResources);
+ renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
- deleteLevelFilesInDisk(sourceTsFileResources);
- renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
}
}
if (logFile.exists()) {
Files.delete(logFile.toPath());
}
- return true;
} catch (Throwable e) {
logger.error("exception occurs during recovering compaction", e);
canMerge = false;
- return false;
- }
- }
-
- private TsFileResource getResourceFromDataDirs(
- boolean recover, String[] dataDirs, boolean isSeq, CompactionFileInfo info)
- throws IOException {
- TsFileResource foundResource = null;
- if (recover) {
- for (String dataDir : dataDirs) {
- if ((foundResource = getRecoverTsFileResource(info.getFile(dataDir).getPath(), isSeq))
- != null) {
- return foundResource;
- }
- }
- } else {
- for (String dataDir : dataDirs) {
- if ((foundResource = getTsFileResource(info.getFile(dataDir).getPath(), isSeq)) != null) {
- return foundResource;
- }
- }
}
- return null;
}
@Override
@@ -795,28 +741,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return newUnSequenceTsFileResources;
}
- private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
- throws IOException {
- try {
- if (isSeq) {
- for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
- }
- }
- } else {
- for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
- }
- }
- }
- } catch (IOException e) {
- logger.error("cannot get tsfile resource path: {}", filePath);
- }
- return null;
- }
-
private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
readLock();
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index d738a01..e50a1d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -174,9 +174,6 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
}
@Override
- public void addRecover(TsFileResource tsFileResource, boolean sequence) {}
-
- @Override
public void addAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
writeLock();
try {
@@ -261,9 +258,7 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
}
@Override
- public boolean recover() {
- return true;
- }
+ public void recover() {}
@Override
public void forkCurrentFileList(long timePartition) {}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 3a3cab9..936a451 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -428,8 +428,8 @@ public class CompactionUtils {
for (TsFileResource tsFileResource : tsFileResources) {
targetResource.updatePlanIndexes(tsFileResource);
}
- targetResource.serialize();
writer.endFile();
+ targetResource.serialize();
targetResource.close();
} finally {
if (writer.canWrite()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
index 871961d..ad66344 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -34,11 +34,10 @@ public class CompactionMergeRecoverTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(CompactionMergeRecoverTask.class);
- private TsFileManagement.CompactionRecoverTask compactionRecoverTask;
private RecoverMergeTask recoverMergeTask;
private TsFileManagement tsFileManagement;
- private String storageGroupSysDir;
private String storageGroupName;
+ private StorageGroupProcessor.CloseCompactionMergeCallBack closeCompactionMergeCallBack;
public CompactionMergeRecoverTask(
TsFileManagement tsFileManagement,
@@ -51,10 +50,8 @@ public class CompactionMergeRecoverTask implements Runnable {
String storageGroupName,
StorageGroupProcessor.CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
this.tsFileManagement = tsFileManagement;
- this.compactionRecoverTask =
- this.tsFileManagement.new CompactionRecoverTask(closeCompactionMergeCallBack);
- this.storageGroupSysDir = storageGroupSysDir;
this.storageGroupName = storageGroupName;
+ this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.recoverMergeTask =
new RecoverMergeTask(
seqFiles,
@@ -73,9 +70,13 @@ public class CompactionMergeRecoverTask implements Runnable {
recoverMergeTask.recoverMerge(true);
} catch (MetadataException | IOException e) {
logger.error(e.getMessage(), e);
+ tsFileManagement.canMerge = false;
+ logger.warn(
+ "{} [Compaction] Exception occurs while recovering merge, set can merge to false",
+ storageGroupName);
}
- compactionRecoverTask.call();
tsFileManagement.recovered = true;
+ this.closeCompactionMergeCallBack.call(false, -1);
logger.info("{} Compaction recover finish", storageGroupName);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 8361382..705a816 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -419,6 +419,8 @@ public class StorageGroupProcessor {
logger.info("recover Storage Group {}", logicalStorageGroupName + "-" + virtualStorageGroupId);
try {
+ // recover inner space compaction
+ this.tsFileManagement.new CompactionRecoverTask().call();
// collect candidate TsFiles from sequential and unsequential data directory
Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
@@ -670,14 +672,8 @@ public class StorageGroupProcessor {
try {
// this tsfile is not zero level, no need to perform redo wal
if (TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName()) > 0) {
- writer =
- recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer);
- if (writer.hasCrashed()) {
- tsFileManagement.addRecover(tsFileResource, isSeq);
- } else {
- tsFileResource.setClosed(true);
- tsFileManagement.add(tsFileResource, isSeq);
- }
+ tsFileResource.setClosed(true);
+ tsFileManagement.add(tsFileResource, isSeq);
continue;
} else {
writer =
@@ -2023,9 +2019,10 @@ public class StorageGroupProcessor {
}
/** close recover compaction merge callback, to start continuous compaction */
- private void closeCompactionRecoverCallBack(boolean isMerge, long timePartitionId) {
+ private void closeCompactionRecoverCallBack(boolean isMerging, long timePartition) {
if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- == CompactionStrategy.NO_COMPACTION) {
+ == CompactionStrategy.NO_COMPACTION
+ || !this.tsFileManagement.canMerge) {
return;
}
CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(logicalStorageGroupName);
@@ -2141,7 +2138,7 @@ public class StorageGroupProcessor {
}
public void merge() {
- if (!tsFileManagement.recovered || compacting) {
+ if (!tsFileManagement.recovered || compacting || !tsFileManagement.canMerge) {
// recovering or doing compaction
// stop running new compaction
return;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 0d2d64f..57633bb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -51,7 +51,6 @@ import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -59,11 +58,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -248,146 +243,6 @@ public class LevelCompactionRecoverTest {
FileUtils.deleteDirectory(tempSGDir);
}
- // uncompeleted target file and log
- /** compaction recover merge finished, delete one device - offset */
- @Test
- public void testCompactionRecoverWithUncompletedTargetFileAndLog()
- throws IOException, IllegalPathException {
- LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
- levelCompactionTsFileManagement.addAll(seqResources, true);
- levelCompactionTsFileManagement.addAll(unseqResources, false);
- QueryContext context = new QueryContext();
- PartialPath path =
- new PartialPath(
- deviceIds[0]
- + TsFileConstant.PATH_SEPARATOR
- + measurementSchemas[0].getMeasurementId());
- ChunkCache.getInstance().clear();
- TimeSeriesMetadataCache.getInstance().clear();
- IBatchReader tsFilesReader =
- new SeriesRawDataBatchReader(
- path,
- measurementSchemas[0].getType(),
- context,
- levelCompactionTsFileManagement.getTsFileList(true),
- new ArrayList<>(),
- null,
- null,
- true);
- int count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
- count++;
- }
- }
- tsFilesReader.close();
- assertEquals(600, count);
-
- CompactionLogger compactionLogger =
- new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
- compactionLogger.logFile(SOURCE_NAME, seqResources.get(0).getTsFile());
- compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
- compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
- compactionLogger.logSequence(true);
- deleteFileIfExists(
- new File(
- TestConstant.SEQUENCE_DATA_DIR.concat(
- 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 1
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- TsFileResource targetTsFileResource =
- new TsFileResource(
- new File(
- TestConstant.SEQUENCE_DATA_DIR.concat(
- 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 1
- + IoTDBConstant.FILE_NAME_SEPARATOR
- + 0
- + ".tsfile")));
- compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
- CompactionUtils.merge(
- targetTsFileResource,
- new ArrayList<>(seqResources.subList(0, 3)),
- COMPACTION_TEST_SG,
- compactionLogger,
- new HashSet<>(),
- true,
- new ArrayList<>(),
- null);
- compactionLogger.close();
-
- BufferedReader logReader =
- new BufferedReader(
- new FileReader(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
- List<String> logs = new ArrayList<>();
- String line;
- while ((line = logReader.readLine()) != null) {
- logs.add(line);
- }
- logReader.close();
- BufferedWriter logStream =
- new BufferedWriter(
- new FileWriter(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
- false));
- for (int i = 0; i < logs.size() - 1; i++) {
- logStream.write(logs.get(i));
- logStream.newLine();
- }
- logStream.close();
-
- TsFileOutput out =
- FSFactoryProducer.getFileOutputFactory()
- .getTsFileOutput(targetTsFileResource.getTsFile().getPath(), true);
- out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
- out.close();
-
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
- levelCompactionTsFileManagement.recover();
- context = new QueryContext();
- path =
- new PartialPath(
- deviceIds[0]
- + TsFileConstant.PATH_SEPARATOR
- + measurementSchemas[0].getMeasurementId());
- ChunkCache.getInstance().clear();
- TimeSeriesMetadataCache.getInstance().clear();
- tsFilesReader =
- new SeriesRawDataBatchReader(
- path,
- measurementSchemas[0].getType(),
- context,
- levelCompactionTsFileManagement.getTsFileList(true),
- new ArrayList<>(),
- null,
- null,
- true);
- count = 0;
- while (tsFilesReader.hasNextBatch()) {
- BatchData batchData = tsFilesReader.nextBatch();
- for (int i = 0; i < batchData.length(); i++) {
- assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
- count++;
- }
- }
- tsFilesReader.close();
- assertEquals(600, count);
- }
-
/** compaction recover merge finished */
@Test
public void testRecoverCompleteTargetFileAndCompactionLog()