You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/29 10:03:21 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] compaction not block
flush (#2364)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 093a9e5 [To rel/0.11] compaction not block flush (#2364)
093a9e5 is described below
commit 093a9e5141fa0bcd50f79c76998e4d292c18c800
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Dec 29 18:03:07 2020 +0800
[To rel/0.11] compaction not block flush (#2364)
* compaction_not_block_flush_to_0.11
* remove useless
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../compaction/CompactionMergeTaskPoolManager.java | 26 ++
.../db/engine/compaction/TsFileManagement.java | 5 -
.../level/LevelCompactionTsFileManagement.java | 319 ++++++++++++---------
.../no/NoCompactionTsFileManagement.java | 5 -
.../engine/compaction/utils/CompactionUtils.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 19 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 3 +
7 files changed, 213 insertions(+), 168 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 3473d54..218be58 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.engine.compaction;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
+
+import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -27,6 +30,9 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +80,26 @@ public class CompactionMergeTaskPoolManager implements IService {
}
}
+ @TestOnly
+ public void waitAllCompactionFinish() {
+ if (pool != null) {
+ File sgDir = FSFactoryProducer.getFSFactory().getFile(
+ FilePathUtils.regularizePath(IoTDBDescriptor.getInstance().getConfig().getSystemDir())
+ + "storage_groups");
+ File[] subDirList = sgDir.listFiles();
+ if(subDirList!=null) {
+ for (File subDir : subDirList) {
+ while (FSFactoryProducer.getFSFactory().getFile(
+ subDir.getAbsoluteFile() + File.separator + subDir.getName() + COMPACTION_LOG_NAME)
+ .exists()) {
+ // wait
+ }
+ }
+ }
+ logger.info("All compaction task finish");
+ }
+ }
+
private void waitTermination() {
long startTime = System.currentTimeMillis();
while (!pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index f7f68c3..096abec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -74,11 +74,6 @@ public abstract class TsFileManagement {
}
/**
- * get the TsFile list which has been completed hot compacted
- */
- public abstract List<TsFileResource> getStableTsFileList(boolean sequence);
-
- /**
* get the TsFile list in sequence
*/
public abstract List<TsFileResource> getTsFileList(boolean sequence);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index e6ff2ec..33e4e11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -31,21 +31,24 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -60,21 +63,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private static final Logger logger = LoggerFactory
.getLogger(LevelCompactionTsFileManagement.class);
-
- private final int seqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
- private final int seqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
- .getSeqFileNumInEachLevel();
- private final int unseqLevelNum = IoTDBDescriptor.getInstance().getConfig()
- .getUnseqLevelNum();
- private final int unseqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
- .getSeqFileNumInEachLevel();
+ private final int seqLevelNum = Math
+ .max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
+ private final int seqFileNumInEachLevel = Math
+ .max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
+ private final int unseqLevelNum = Math
+ .max(IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum(), 1);
+ private final int unseqFileNumInEachLevel = Math
+ .max(IoTDBDescriptor.getInstance().getConfig().getUnseqFileNumInEachLevel(), 1);
private final boolean enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig()
.isEnableUnseqCompaction();
private final boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig()
.isForceFullMerge();
// First map is partition list; Second list is level list; Third list is file list in level;
- private final Map<Long, List<TreeSet<TsFileResource>>> sequenceTsFileResources = new ConcurrentSkipListMap<>();
+ private final Map<Long, List<SortedSet<TsFileResource>>> sequenceTsFileResources = new ConcurrentSkipListMap<>();
private final Map<Long, List<List<TsFileResource>>> unSequenceTsFileResources = new ConcurrentSkipListMap<>();
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
@@ -99,13 +102,17 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (sequence) {
if (sequenceTsFileResources.containsKey(timePartitionId)) {
if (sequenceTsFileResources.get(timePartitionId).size() > level) {
- sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+ synchronized (sequenceTsFileResources) {
+ sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+ }
}
}
} else {
if (unSequenceTsFileResources.containsKey(timePartitionId)) {
if (unSequenceTsFileResources.get(timePartitionId).size() > level) {
- unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+ synchronized (unSequenceTsFileResources) {
+ unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+ }
}
}
}
@@ -126,33 +133,23 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
@Override
- public List<TsFileResource> getStableTsFileList(boolean sequence) {
- List<TsFileResource> result = new ArrayList<>();
- if (sequence) {
- for (List<TreeSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources.values()) {
- result.addAll(sequenceTsFileList.get(seqLevelNum - 1));
- }
- } else {
- for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
- result.addAll(unSequenceTsFileList.get(unseqLevelNum - 1));
- }
- }
- return result;
- }
-
- @Override
public List<TsFileResource> getTsFileList(boolean sequence) {
List<TsFileResource> result = new ArrayList<>();
if (sequence) {
- for (List<TreeSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources.values()) {
- for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
- result.addAll(sequenceTsFileList.get(i));
+ synchronized (sequenceTsFileResources) {
+ for (List<SortedSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources
+ .values()) {
+ for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
+ result.addAll(sequenceTsFileList.get(i));
+ }
}
}
} else {
- for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
- for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
- result.addAll(unSequenceTsFileList.get(i));
+ synchronized (unSequenceTsFileResources) {
+ for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
+ for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
+ result.addAll(unSequenceTsFileList.get(i));
+ }
}
}
}
@@ -167,14 +164,18 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
public void remove(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
- for (TreeSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
- .get(tsFileResource.getTimePartition())) {
- sequenceTsFileResource.remove(tsFileResource);
+ synchronized (sequenceTsFileResources) {
+ for (SortedSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
+ .get(tsFileResource.getTimePartition())) {
+ sequenceTsFileResource.remove(tsFileResource);
+ }
}
} else {
- for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources
- .get(tsFileResource.getTimePartition())) {
- unSequenceTsFileResource.remove(tsFileResource);
+ synchronized (unSequenceTsFileResources) {
+ for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources
+ .get(tsFileResource.getTimePartition())) {
+ unSequenceTsFileResource.remove(tsFileResource);
+ }
}
}
}
@@ -182,17 +183,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
if (sequence) {
- for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
- .values()) {
- for (TreeSet<TsFileResource> levelTsFileResource : partitionSequenceTsFileResource) {
- levelTsFileResource.removeAll(tsFileResourceList);
+ synchronized (sequenceTsFileResources) {
+ for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+ .values()) {
+ for (SortedSet<TsFileResource> levelTsFileResource : partitionSequenceTsFileResource) {
+ levelTsFileResource.removeAll(tsFileResourceList);
+ }
}
}
} else {
- for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : unSequenceTsFileResources
- .values()) {
- for (List<TsFileResource> levelTsFileResource : partitionUnSequenceTsFileResource) {
- levelTsFileResource.removeAll(tsFileResourceList);
+ synchronized (unSequenceTsFileResources) {
+ for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : unSequenceTsFileResources
+ .values()) {
+ for (List<TsFileResource> levelTsFileResource : partitionUnSequenceTsFileResource) {
+ levelTsFileResource.removeAll(tsFileResourceList);
+ }
}
}
}
@@ -203,28 +208,33 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
long timePartitionId = tsFileResource.getTimePartition();
int level = getMergeLevel(tsFileResource.getTsFile());
if (sequence) {
- if (level <= seqLevelNum - 1) {
- // current file has too high level
- sequenceTsFileResources
- .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level)
- .add(tsFileResource);
- } else {
- // current file has normal level
- sequenceTsFileResources
- .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(seqLevelNum - 1)
- .add(tsFileResource);
+ synchronized (sequenceTsFileResources) {
+ if (level <= seqLevelNum - 1) {
+ // current file has normal level
+ sequenceTsFileResources
+ .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level)
+ .add(tsFileResource);
+ } else {
+ // current file has too high level
+ sequenceTsFileResources
+ .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
+ .get(seqLevelNum - 1)
+ .add(tsFileResource);
+ }
}
} else {
- if (level <= unseqLevelNum - 1) {
- // current file has too high level
- unSequenceTsFileResources
- .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level)
- .add(tsFileResource);
- } else {
- // current file has normal level
- unSequenceTsFileResources
- .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources)
- .get(unseqLevelNum - 1).add(tsFileResource);
+ synchronized (unSequenceTsFileResources) {
+ if (level <= unseqLevelNum - 1) {
+ // current file has normal level
+ unSequenceTsFileResources
+ .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level)
+ .add(tsFileResource);
+ } else {
+ // current file has too high level
+ unSequenceTsFileResources
+ .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources)
+ .get(unseqLevelNum - 1).add(tsFileResource);
+ }
}
}
}
@@ -239,7 +249,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
public boolean contains(TsFileResource tsFileResource, boolean sequence) {
if (sequence) {
- for (TreeSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
+ for (SortedSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
.computeIfAbsent(tsFileResource.getTimePartition(), this::newSequenceTsFileResources)) {
if (sequenceTsFileResource.contains(tsFileResource)) {
return true;
@@ -266,9 +276,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@SuppressWarnings("squid:S3776")
public boolean isEmpty(boolean sequence) {
if (sequence) {
- for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+ for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
.values()) {
- for (TreeSet<TsFileResource> sequenceTsFileResource : partitionSequenceTsFileResource) {
+ for (SortedSet<TsFileResource> sequenceTsFileResource : partitionSequenceTsFileResource) {
if (!sequenceTsFileResource.isEmpty()) {
return false;
}
@@ -291,7 +301,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
public int size(boolean sequence) {
int result = 0;
if (sequence) {
- for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+ for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
.values()) {
for (int i = seqLevelNum - 1; i >= 0; i--) {
result += partitionSequenceTsFileResource.get(i).size();
@@ -324,75 +334,71 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
List<String> sourceFileList = logAnalyzer.getSourceFiles();
long offset = logAnalyzer.getOffset();
String targetFile = logAnalyzer.getTargetFile();
- boolean isMergeFinished = logAnalyzer.isMergeFinished();
boolean fullMerge = logAnalyzer.isFullMerge();
boolean isSeq = logAnalyzer.isSeq();
- if (targetFile == null) {
+ if (targetFile == null || sourceFileList.isEmpty()) {
+ return;
+ }
+ File target = new File(targetFile);
+ if (deviceSet.isEmpty()) {
+ // if not in compaction, just delete the target file
+ if (target.exists()) {
+ Files.delete(target.toPath());
+ }
return;
}
if (fullMerge) {
- if (!isMergeFinished) {
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(new File(targetFile));
- writer.getIOWriterOut().truncate(offset - 1);
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+ TsFileResource targetTsFileResource = getTsFileResource(targetFile, isSeq);
+ long timePartition = targetTsFileResource.getTimePartition();
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+ // if not complete compaction, resume merge
+ if (writer.hasCrashed()) {
+ if (offset > 0) {
+ writer.getIOWriterOut().truncate(offset - 1);
+ }
writer.close();
- TsFileResource targetTsFileResource = getTsFileResource(targetFile, isSeq);
- long timePartition = targetTsFileResource.getTimePartition();
+ CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+ storageGroupName);
CompactionUtils
.merge(targetTsFileResource, getTsFileList(isSeq), storageGroupName,
- new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
- if (isSeq) {
- for (int level = 0; level < sequenceTsFileResources.get(timePartition).size();
- level++) {
- TreeSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
- .get(timePartition).get(level);
- deleteLevelFilesInDisk(currLevelMergeFile);
- deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
- }
- } else {
- for (int level = 0; level < unSequenceTsFileResources.get(timePartition).size();
- level++) {
- TreeSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
- .get(timePartition).get(level);
- deleteLevelFilesInDisk(currLevelMergeFile);
- deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
- }
- }
+ compactionLogger, deviceSet, isSeq);
+ compactionLogger.close();
+ } else {
+ writer.close();
}
+ // complete compaction and delete source file
+ deleteAllSubLevelFiles(isSeq, timePartition);
} else {
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
TsFileResource targetResource = getTsFileResource(targetFile, isSeq);
long timePartition = targetResource.getTimePartition();
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(new File(targetFile));
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String file : sourceFileList) {
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
sourceTsFileResources.add(getTsFileResource(file, isSeq));
}
- if (sourceFileList.isEmpty()) {
- return;
- }
int level = getMergeLevel(new File(sourceFileList.get(0)));
- if (!isMergeFinished) {
- if (deviceSet.isEmpty()) {
- Files.delete(new File(targetFile).toPath());
- } else {
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+ // if not complete compaction, resume merge
+ if (writer.hasCrashed()) {
+ if (offset > 0) {
writer.getIOWriterOut().truncate(offset - 1);
- writer.close();
- if (isSeq) {
- CompactionUtils
- .merge(targetResource, sourceTsFileResources, storageGroupName,
- new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, true);
- deleteLevelFilesInDisk(sourceTsFileResources);
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- sequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
- } else {
- CompactionUtils
- .merge(targetResource, sourceTsFileResources, storageGroupName,
- new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, false);
- deleteLevelFilesInDisk(sourceTsFileResources);
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- unSequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
- }
}
+ writer.close();
+ CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+ storageGroupName);
+ CompactionUtils
+ .merge(targetResource, sourceTsFileResources, storageGroupName,
+ compactionLogger, deviceSet,
+ isSeq);
+ compactionLogger.close();
+ } else {
+ writer.close();
}
+ // complete compaction and delete source file
+ deleteLevelFilesInDisk(sourceTsFileResources);
+ deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
}
}
} catch (IOException e) {
@@ -408,18 +414,42 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
}
+ private void deleteAllSubLevelFiles(boolean isSeq, long timePartition) {
+ if (isSeq) {
+ for (int level = 0; level < sequenceTsFileResources.get(timePartition).size();
+ level++) {
+ SortedSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
+ .get(timePartition).get(level);
+ deleteLevelFilesInDisk(currLevelMergeFile);
+ deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
+ }
+ } else {
+ for (int level = 0; level < unSequenceTsFileResources.get(timePartition).size();
+ level++) {
+ SortedSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
+ .get(timePartition).get(level);
+ deleteLevelFilesInDisk(currLevelMergeFile);
+ deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
+ }
+ }
+ }
+
@Override
public void forkCurrentFileList(long timePartition) {
- forkTsFileList(
- forkedSequenceTsFileResources,
- sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
- seqLevelNum, seqFileNumInEachLevel);
+ synchronized (sequenceTsFileResources) {
+ forkTsFileList(
+ forkedSequenceTsFileResources,
+ sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
+ seqLevelNum, seqFileNumInEachLevel);
+ }
// we have to copy all unseq file
- forkTsFileList(
- forkedUnSequenceTsFileResources,
- unSequenceTsFileResources
- .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
- unseqLevelNum + 1, unseqFileNumInEachLevel);
+ synchronized (unSequenceTsFileResources) {
+ forkTsFileList(
+ forkedUnSequenceTsFileResources,
+ unSequenceTsFileResources
+ .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
+ unseqLevelNum + 1, unseqFileNumInEachLevel);
+ }
}
private void forkTsFileList(
@@ -480,6 +510,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} else {
CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
storageGroupName);
+ // log source file list and target file for recover
for (TsFileResource mergeResource : mergeResources.get(i)) {
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
@@ -496,6 +527,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
TsFileResource newResource = new TsFileResource(newLevelFile);
+ // merge, read from source files and write to target file
CompactionUtils
.merge(newResource, toMergeTsFiles, storageGroupName, compactionLogger,
new HashSet<>(), sequence);
@@ -504,7 +536,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
storageGroupName, i, toMergeTsFiles.size());
writeLock();
try {
- compactionLogger.logMergeFinish();
if (sequence) {
sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
} else {
@@ -546,17 +577,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return new File(prefixPath + level + TSFILE_SUFFIX);
}
- private List<TreeSet<TsFileResource>> newSequenceTsFileResources(Long k) {
- List<TreeSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
+ private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
+ List<SortedSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
for (int i = 0; i < seqLevelNum; i++) {
- newSequenceTsFileResources.add(new TreeSet<>(
+ newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
(o1, o2) -> {
- int rangeCompare = Long
- .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
- Long.parseLong(o2.getTsFile().getParentFile().getName()));
- return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
- : rangeCompare;
- }));
+ try {
+ int rangeCompare = Long
+ .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+ Long.parseLong(o2.getTsFile().getParentFile().getName()));
+ return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+ : rangeCompare;
+ } catch (NumberFormatException e) {
+ return compareFileName(o1.getTsFile(), o2.getTsFile());
+ }
+ })));
}
return newSequenceTsFileResources;
}
@@ -578,9 +613,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private TsFileResource getTsFileResource(String filePath, boolean isSeq) throws IOException {
if (isSeq) {
- for (List<TreeSet<TsFileResource>> tsFileResourcesWithLevel : sequenceTsFileResources
+ for (List<SortedSet<TsFileResource>> tsFileResourcesWithLevel : sequenceTsFileResources
.values()) {
- for (TreeSet<TsFileResource> tsFileResources : tsFileResourcesWithLevel) {
+ for (SortedSet<TsFileResource> tsFileResources : tsFileResourcesWithLevel) {
for (TsFileResource tsFileResource : tsFileResources) {
if (tsFileResource.getTsFile().getAbsolutePath().equals(filePath)) {
return tsFileResource;
@@ -603,4 +638,4 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.error("cannot get tsfile resource path: {}", filePath);
throw new IOException();
}
-}
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index dec92e0..d80a41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -47,11 +47,6 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
}
@Override
- public List<TsFileResource> getStableTsFileList(boolean sequence) {
- return getTsFileList(sequence);
- }
-
- @Override
public List<TsFileResource> getTsFileList(boolean sequence) {
if (sequence) {
return new ArrayList<>(sequenceFileTreeSet);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 80c31cb..8152cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -149,7 +149,9 @@ public class CompactionUtils {
chunkWriter = new ChunkWriterImpl(
IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()));
} catch (MetadataException e) {
- throw new IOException(e);
+ // this may caused in IT by restart
+ logger.error("{} get schema {} error,skip this sensor", device, entry.getKey());
+ return maxVersion;
}
for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
writeTVPair(timeValuePair, chunkWriter);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a91661b..5fb31f6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1317,14 +1317,6 @@ public class StorageGroupProcessor {
(System.currentTimeMillis() - startTime) / 1000);
}
}
- while (compactionMergeWorking) {
- closeStorageGroupCondition.wait(100);
- if (System.currentTimeMillis() - startTime > 60_000) {
- logger
- .warn("{} has spent {}s to wait for closing compaction.", this.storageGroupName,
- (System.currentTimeMillis() - startTime) / 1000);
- }
- }
} catch (InterruptedException e) {
logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
+ "group {}", storageGroupName, e);
@@ -1726,6 +1718,10 @@ public class StorageGroupProcessor {
} else {
closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
}
+ synchronized (closeStorageGroupCondition) {
+ closeStorageGroupCondition.notifyAll();
+ }
+ logger.info("signal closing storage group condition in {}", storageGroupName);
if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
.isTerminated()) {
compactionMergeWorking = true;
@@ -1745,10 +1741,6 @@ public class StorageGroupProcessor {
logger.info("{} last compaction merge task is working, skip current merge",
storageGroupName);
}
- synchronized (closeStorageGroupCondition) {
- closeStorageGroupCondition.notifyAll();
- }
- logger.info("signal closing storage group condition in {}", storageGroupName);
}
/**
@@ -1756,9 +1748,6 @@ public class StorageGroupProcessor {
*/
private void closeCompactionMergeCallBack() {
this.compactionMergeWorking = false;
- synchronized (closeStorageGroupCondition) {
- closeStorageGroupCondition.notifyAll();
- }
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c1ea53e..00db06a 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -78,6 +79,8 @@ public class EnvironmentUtils {
.parseBoolean(System.getProperty("test.port.closed", "false"));
public static void cleanEnv() throws IOException, StorageEngineException {
+ // wait all compaction finished
+ CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
logger.warn("EnvironmentUtil cleanEnv...");
if (daemon != null) {
daemon.stop();