You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/15 11:58:15 UTC
[iotdb] branch rel/0.12 updated: [IoTDB-1501][To rel/0.12] Fix
compaction recover delete tsfile bug (#3568)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new cac70a5 [IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)
cac70a5 is described below
commit cac70a501e3b9ded50ea124b8b4499a845ca19a3
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Thu Jul 15 19:57:51 2021 +0800
[IoTDB-1501][To rel/0.12] Fix compaction recover delete tsfile bug (#3568)
---
.../compaction/CompactionMergeTaskPoolManager.java | 24 +++---
.../level/LevelCompactionTsFileManagement.java | 89 +++++++++++++---------
.../compaction/LevelCompactionRecoverTest.java | 14 +++-
3 files changed, 75 insertions(+), 52 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 9b7949c..2a40452 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -34,11 +34,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -56,7 +55,7 @@ public class CompactionMergeTaskPoolManager implements IService {
new CompactionMergeTaskPoolManager();
private ScheduledExecutorService scheduledPool;
private ExecutorService pool;
- private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+ private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -69,12 +68,13 @@ public class CompactionMergeTaskPoolManager implements IService {
public void start() {
if (pool == null) {
this.pool =
- IoTDBThreadPoolFactory.newScheduledThreadPool(
+ IoTDBThreadPoolFactory.newFixedThreadPool(
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
ThreadName.COMPACTION_SERVICE.getName());
this.scheduledPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(
- Integer.MAX_VALUE, ThreadName.COMPACTION_SERVICE.getName());
+ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+ ThreadName.COMPACTION_SERVICE.getName());
}
logger.info("Compaction task manager started.");
}
@@ -175,17 +175,15 @@ public class CompactionMergeTaskPoolManager implements IService {
* corresponding storage group.
*/
public void abortCompaction(String storageGroup) {
- Set<Future<Void>> subTasks =
- storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
- Iterator<Future<Void>> subIterator = subTasks.iterator();
- while (subIterator.hasNext()) {
- Future<Void> next = subIterator.next();
+ List<Future<Void>> subTasks =
+ storageGroupTasks.getOrDefault(storageGroup, Collections.emptyList());
+ for (Future<Void> next : subTasks) {
if (!next.isDone() && !next.isCancelled()) {
next.cancel(true);
sgCompactionStatus.put(storageGroup, false);
}
- subIterator.remove();
}
+ subTasks.clear();
}
public synchronized void clearCompactionStatus(String storageGroupName) {
@@ -213,7 +211,7 @@ public class CompactionMergeTaskPoolManager implements IService {
sgCompactionStatus.put(storageGroup, true);
Future<Void> future = pool.submit(storageGroupCompactionTask);
storageGroupTasks
- .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
+ .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
.add(future);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index f740c6b..fb5b601 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -472,6 +472,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} else {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
+ if (targetResource == null) {
+ // new file already merged but old file not deleted
+ targetResource = getTsFileResource(targetFile, isSeq);
+ if (targetResource == null) {
+ throw new IOException();
+ }
+ }
long timePartition = targetResource.getTimePartition();
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String file : sourceFileList) {
@@ -484,6 +491,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+ List<Modification> modifications = new ArrayList<>();
// if not complete compaction, resume merge
if (writer.hasCrashed()) {
if (offset > 0) {
@@ -492,7 +500,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
writer.close();
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
- List<Modification> modifications = new ArrayList<>();
CompactionUtils.merge(
targetResource,
sourceTsFileResources,
@@ -501,31 +508,39 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
deviceSet,
isSeq,
modifications);
- // complete compaction and delete source file
- writeLock();
- try {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", storageGroupName));
- }
- int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
- if (isSeq) {
- sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- sequenceRecoverTsFileResources.clear();
- } else {
- unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- unSequenceRecoverTsFileResources.clear();
- }
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- } finally {
- writeUnlock();
- }
- deleteLevelFilesInDisk(sourceTsFileResources);
- renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
compactionLogger.close();
+ // complete compaction and add target tsfile
+ int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
+ if (isSeq) {
+ sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ sequenceRecoverTsFileResources.clear();
+ } else {
+ unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ unSequenceRecoverTsFileResources.clear();
+ }
} else {
+ // complete compaction, just close writer
writer.close();
}
+ // complete compaction and delete source file
+ writeLock();
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
+ deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+ } finally {
+ writeUnlock();
+ }
+ for (TsFileResource tsFileResource : sourceTsFileResources) {
+ logger.error(
+ "{} recover storage group delete source file {}",
+ storageGroupName,
+ tsFileResource.getTsFile().getName());
+ }
+ deleteLevelFilesInDisk(sourceTsFileResources);
+ renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
}
} catch (IOException | IllegalPathException | InterruptedException e) {
@@ -787,23 +802,25 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return newUnSequenceTsFileResources;
}
- private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
- throws IOException {
- if (isSeq) {
- for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
+ private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq) {
+ try {
+ if (isSeq) {
+ for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+ return tsFileResource;
+ }
}
- }
- } else {
- for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
+ } else {
+ for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+ return tsFileResource;
+ }
}
}
+ } catch (IOException e) {
+ logger.error("cannot get tsfile resource path: {}", filePath);
}
- logger.error("cannot get tsfile resource path: {}", filePath);
- throw new IOException();
+ return null;
}
private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
@@ -868,6 +885,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (targetFilePath != null) {
File targetFile = new File(targetFilePath);
if (targetFile.exists()) {
+ logger.error(
+ "{} restore delete target file {} ", storageGroupName, targetFile.getName());
targetFile.delete();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 7adffc2..ddd2005 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -139,7 +140,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
true,
new ArrayList<>());
compactionLogger.close();
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+ levelCompactionTsFileManagement.add(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -254,7 +255,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
logStream.close();
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+ levelCompactionTsFileManagement.add(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -408,6 +409,9 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
@Test
public void testCompactionMergeRecoverMergeFinishedUnseq()
throws IOException, IllegalPathException {
+ int prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
+ IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
+
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -466,7 +470,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
false,
new ArrayList<>());
compactionLogger.close();
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
+ levelCompactionTsFileManagement.add(targetTsFileResource, false);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -493,6 +497,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
assertEquals(500, count);
+
+ IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
}
/** compaction recover merge start just log source file */
@@ -671,7 +677,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
new HashSet<>(),
true,
new ArrayList<>());
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+ levelCompactionTsFileManagement.add(targetTsFileResource, true);
compactionLogger.close();
levelCompactionTsFileManagement.recover();
QueryContext context = new QueryContext();