You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/07/27 10:13:32 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] Revert commit
bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 30e3dad [To rel/0.12] Revert commit bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)
30e3dad is described below
commit 30e3dad13738bed588a020ed41c293eeae96d618
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Tue Jul 27 05:13:06 2021 -0500
[To rel/0.12] Revert commit bf6d83cd (#3534) and commit cac70a50 (#3568) (#3634)
---
.../compaction/CompactionMergeTaskPoolManager.java | 31 ++++----
.../level/LevelCompactionTsFileManagement.java | 89 +++++++++-------------
.../compaction/LevelCompactionRecoverTest.java | 14 +---
3 files changed, 52 insertions(+), 82 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 2a40452..9f7ff5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -34,10 +34,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -53,9 +54,8 @@ public class CompactionMergeTaskPoolManager implements IService {
LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
private static final CompactionMergeTaskPoolManager INSTANCE =
new CompactionMergeTaskPoolManager();
- private ScheduledExecutorService scheduledPool;
- private ExecutorService pool;
- private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
+ private ScheduledExecutorService pool;
+ private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>();
@@ -68,10 +68,6 @@ public class CompactionMergeTaskPoolManager implements IService {
public void start() {
if (pool == null) {
this.pool =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
- ThreadName.COMPACTION_SERVICE.getName());
- this.scheduledPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
ThreadName.COMPACTION_SERVICE.getName());
@@ -82,7 +78,6 @@ public class CompactionMergeTaskPoolManager implements IService {
@Override
public void stop() {
if (pool != null) {
- scheduledPool.shutdownNow();
pool.shutdownNow();
logger.info("Waiting for task pool to shut down");
waitTermination();
@@ -93,7 +88,6 @@ public class CompactionMergeTaskPoolManager implements IService {
@Override
public void waitAndStop(long milliseconds) {
if (pool != null) {
- awaitTermination(scheduledPool, milliseconds);
awaitTermination(pool, milliseconds);
logger.info("Waiting for task pool to shut down");
waitTermination();
@@ -148,7 +142,6 @@ public class CompactionMergeTaskPoolManager implements IService {
logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
}
}
- scheduledPool = null;
pool = null;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
@@ -175,15 +168,17 @@ public class CompactionMergeTaskPoolManager implements IService {
* corresponding storage group.
*/
public void abortCompaction(String storageGroup) {
- List<Future<Void>> subTasks =
- storageGroupTasks.getOrDefault(storageGroup, Collections.emptyList());
- for (Future<Void> next : subTasks) {
+ Set<Future<Void>> subTasks =
+ storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
+ Iterator<Future<Void>> subIterator = subTasks.iterator();
+ while (subIterator.hasNext()) {
+ Future<Void> next = subIterator.next();
if (!next.isDone() && !next.isCancelled()) {
next.cancel(true);
sgCompactionStatus.put(storageGroup, false);
}
+ subIterator.remove();
}
- subTasks.clear();
}
public synchronized void clearCompactionStatus(String storageGroupName) {
@@ -195,7 +190,7 @@ public class CompactionMergeTaskPoolManager implements IService {
}
public void init(Runnable function) {
- scheduledPool.scheduleWithFixedDelay(
+ pool.scheduleWithFixedDelay(
function, 1000, config.getCompactionInterval(), TimeUnit.MILLISECONDS);
}
@@ -211,7 +206,7 @@ public class CompactionMergeTaskPoolManager implements IService {
sgCompactionStatus.put(storageGroup, true);
Future<Void> future = pool.submit(storageGroupCompactionTask);
storageGroupTasks
- .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
+ .computeIfAbsent(storageGroup, k -> new ConcurrentSkipListSet<>())
.add(future);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index fb5b601..f740c6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -472,13 +472,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} else {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
- if (targetResource == null) {
- // new file already merged but old file not deleted
- targetResource = getTsFileResource(targetFile, isSeq);
- if (targetResource == null) {
- throw new IOException();
- }
- }
long timePartition = targetResource.getTimePartition();
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String file : sourceFileList) {
@@ -491,7 +484,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
- List<Modification> modifications = new ArrayList<>();
// if not complete compaction, resume merge
if (writer.hasCrashed()) {
if (offset > 0) {
@@ -500,6 +492,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
writer.close();
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
+ List<Modification> modifications = new ArrayList<>();
CompactionUtils.merge(
targetResource,
sourceTsFileResources,
@@ -508,39 +501,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
deviceSet,
isSeq,
modifications);
- compactionLogger.close();
- // complete compaction and add target tsfile
- int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
- if (isSeq) {
- sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- sequenceRecoverTsFileResources.clear();
- } else {
- unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- unSequenceRecoverTsFileResources.clear();
+ // complete compaction and delete source file
+ writeLock();
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
+ int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
+ if (isSeq) {
+ sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ sequenceRecoverTsFileResources.clear();
+ } else {
+ unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
+ unSequenceRecoverTsFileResources.clear();
+ }
+ deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+ } finally {
+ writeUnlock();
}
+ deleteLevelFilesInDisk(sourceTsFileResources);
+ renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
+ compactionLogger.close();
} else {
- // complete compaction, just close writer
writer.close();
}
- // complete compaction and delete source file
- writeLock();
- try {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", storageGroupName));
- }
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- } finally {
- writeUnlock();
- }
- for (TsFileResource tsFileResource : sourceTsFileResources) {
- logger.error(
- "{} recover storage group delete source file {}",
- storageGroupName,
- tsFileResource.getTsFile().getName());
- }
- deleteLevelFilesInDisk(sourceTsFileResources);
- renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
}
} catch (IOException | IllegalPathException | InterruptedException e) {
@@ -802,25 +787,23 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return newUnSequenceTsFileResources;
}
- private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq) {
- try {
- if (isSeq) {
- for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
- }
+ private TsFileResource getRecoverTsFileResource(String filePath, boolean isSeq)
+ throws IOException {
+ if (isSeq) {
+ for (TsFileResource tsFileResource : sequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+ return tsFileResource;
}
- } else {
- for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
- if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
- return tsFileResource;
- }
+ }
+ } else {
+ for (TsFileResource tsFileResource : unSequenceRecoverTsFileResources) {
+ if (Files.isSameFile(tsFileResource.getTsFile().toPath(), new File(filePath).toPath())) {
+ return tsFileResource;
}
}
- } catch (IOException e) {
- logger.error("cannot get tsfile resource path: {}", filePath);
}
- return null;
+ logger.error("cannot get tsfile resource path: {}", filePath);
+ throw new IOException();
}
private TsFileResource getTsFileResource(String filePath, boolean isSeq) {
@@ -885,8 +868,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (targetFilePath != null) {
File targetFile = new File(targetFilePath);
if (targetFile.exists()) {
- logger.error(
- "{} restore delete target file {} ", storageGroupName, targetFile.getName());
targetFile.delete();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index ddd2005..7adffc2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -140,7 +139,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
true,
new ArrayList<>());
compactionLogger.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -255,7 +254,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
logStream.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -409,9 +408,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
@Test
public void testCompactionMergeRecoverMergeFinishedUnseq()
throws IOException, IllegalPathException {
- int prevUnseqLevelNum = IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum();
- IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
-
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -470,7 +466,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
false,
new ArrayList<>());
compactionLogger.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, false);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, false);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -497,8 +493,6 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
assertEquals(500, count);
-
- IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
}
/** compaction recover merge start just log source file */
@@ -677,7 +671,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
new HashSet<>(),
true,
new ArrayList<>());
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
compactionLogger.close();
levelCompactionTsFileManagement.recover();
QueryContext context = new QueryContext();