You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/01/23 14:57:08 UTC
[iotdb] branch master updated: [IOTDB-2453] Remove all not necessarily lock in compaction process (#4952)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4e87d43 [IOTDB-2453] Remove all not necessarily lock in compaction process (#4952)
4e87d43 is described below
commit 4e87d43d0bd18b30f96db9745bdef89c6ee1f4c1
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Sun Jan 23 22:56:17 2022 +0800
[IOTDB-2453] Remove all not necessarily lock in compaction process (#4952)
---
.../db/integration/IoTDBRemovePartitionIT.java | 5 -
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/engine/compaction/CompactionScheduler.java | 6 -
.../engine/compaction/CompactionTaskManager.java | 11 +-
.../AbstractCrossSpaceCompactionSelector.java | 8 +-
.../CrossSpaceCompactionExceptionHandler.java | 3 +-
.../RewriteCrossSpaceCompactionSelector.java | 11 -
.../task/RewriteCrossSpaceCompactionTask.java | 27 +-
.../AbstractInnerSpaceCompactionSelector.java | 11 +-
.../inner/InnerSpaceCompactionTaskFactory.java | 2 -
.../sizetiered/SizeTieredCompactionSelector.java | 7 -
.../inner/sizetiered/SizeTieredCompactionTask.java | 65 +-
.../db/engine/storagegroup/TsFileManager.java | 6 +-
.../db/engine/storagegroup/TsFileResource.java | 4 +
.../engine/compaction/CompactionSchedulerTest.java | 940 ++++-----------------
.../compaction/CompactionTaskManagerTest.java | 17 +-
.../cross/CrossSpaceCompactionExceptionTest.java | 15 +-
.../inner/InnerCompactionSchedulerTest.java | 3 +
.../SizeTieredCompactionHandleExceptionTest.java | 186 ----
19 files changed, 229 insertions(+), 1100 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index 18df741..1ae8258 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -66,11 +66,6 @@ public class IoTDBRemovePartitionIT {
StorageEngine.setEnablePartition(false);
StorageEngine.setTimePartitionInterval(-1);
EnvironmentUtils.cleanEnv();
-
- ch.qos.logback.classic.Logger rootLogger =
- (ch.qos.logback.classic.Logger)
- LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
- rootLogger.setLevel(Level.toLevel("warn"));
}
@Test
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b5009d4..75d77d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1417,7 +1417,7 @@ public class IoTDBConfig {
return crossCompactionMemoryBudget;
}
- void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
+ public void setCrossCompactionMemoryBudget(long crossCompactionMemoryBudget) {
this.crossCompactionMemoryBudget = crossCompactionMemoryBudget;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index a80f675..6b907fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -28,9 +28,6 @@ import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFacto
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* CompactionScheduler schedules and submits the compaction task periodically, and it counts the
* total number of running compaction task. There are three compaction strategy: BALANCE,
@@ -44,9 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class CompactionScheduler {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // fullStorageGroupName -> timePartition -> compactionCount
- private static volatile Map<String, Map<Long, Long>> compactionCountInPartition =
- new ConcurrentHashMap<>();
public static void scheduleCompaction(TsFileManager tsFileManager, long timePartition) {
if (!tsFileManager.isAllowCompaction()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 797bf30..6424a6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -373,8 +373,12 @@ public class CompactionTaskManager implements IService {
}
@TestOnly
- public void restart() {
+ public void restart() throws InterruptedException {
if (IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() > 0) {
+ if (taskExecutionPool != null) {
+ this.taskExecutionPool.shutdownNow();
+ this.taskExecutionPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
this.taskExecutionPool =
(WrappedScheduledExecutorService)
IoTDBThreadPoolFactory.newScheduledThreadPool(
@@ -389,4 +393,9 @@ public class CompactionTaskManager implements IService {
currentTaskNum = new AtomicInteger(0);
logger.info("Compaction task manager started.");
}
+
+ @TestOnly
+ public void clearCandidateQueue() {
+ candidateCompactionTaskQueue.clear();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionSelector.java
index 92f5517..99e2c93 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/AbstractCrossSpaceCompactionSelector.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import java.util.List;
public abstract class AbstractCrossSpaceCompactionSelector extends AbstractCompactionSelector {
protected String logicalStorageGroupName;
@@ -28,8 +30,8 @@ public abstract class AbstractCrossSpaceCompactionSelector extends AbstractCompa
protected String storageGroupDir;
protected long timePartition;
protected TsFileManager tsFileManager;
- protected TsFileResourceList sequenceFileList;
- protected TsFileResourceList unsequenceFileList;
+ protected List<TsFileResource> sequenceFileList;
+ protected List<TsFileResource> unsequenceFileList;
protected CrossSpaceCompactionTaskFactory taskFactory;
public AbstractCrossSpaceCompactionSelector(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
index eb7dc80..47998ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionHandler.java
@@ -50,8 +50,7 @@ public class CrossSpaceCompactionExceptionHandler {
List<TsFileResource> targetResourceList,
List<TsFileResource> seqResourceList,
List<TsFileResource> unseqResourceList,
- TsFileManager tsFileManager,
- long timePartiionId) {
+ TsFileManager tsFileManager) {
try {
if (logFile == null || !logFile.exists()) {
// the log file is null or the log file does not exists
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index 455f3e8..1c68f2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -71,17 +71,6 @@ public class RewriteCrossSpaceCompactionSelector extends AbstractCrossSpaceCompa
public void selectAndSubmit() {
if ((CompactionTaskManager.currentTaskNum.get() >= config.getConcurrentCompactionThread())
|| (!config.isEnableCrossSpaceCompaction())) {
- if (CompactionTaskManager.currentTaskNum.get() >= config.getConcurrentCompactionThread()) {
- LOGGER.debug("End selection because too many threads");
- } else if (!config.isEnableCrossSpaceCompaction()) {
- LOGGER.debug("End selection because cross compaction is not enable");
- } else {
- LOGGER.debug(
- "End selection because {}-{} is compacting, task num in CompactionTaskManager is {}",
- logicalStorageGroupName,
- virtualGroupId,
- CompactionTaskManager.currentTaskNum.get());
- }
return;
}
Iterator<TsFileResource> seqIterator = sequenceFileList.iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 96459cf..362573c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.cross.AbstractCrossSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.cross.CrossSpaceCompactionExceptionHandler;
@@ -29,7 +28,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
@@ -62,9 +60,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
private List<TsFileResource> targetTsfileResourceList;
private List<TsFileResource> holdReadLockList = new ArrayList<>();
private List<TsFileResource> holdWriteLockList = new ArrayList<>();
- private boolean getWriteLockOfManager = false;
- private final long ACQUIRE_WRITE_LOCK_TIMEOUT =
- IoTDBDescriptor.getInstance().getConfig().getCompactionAcquireWriteLockTimeout();
public RewriteCrossSpaceCompactionTask(
String logicalStorageGroupName,
@@ -97,14 +92,10 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
targetTsfileResourceList,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
- tsFileManager,
- timePartition);
+ tsFileManager);
throw throwable;
} finally {
releaseAllLock();
- if (getWriteLockOfManager) {
- tsFileManager.writeUnlock();
- }
}
}
@@ -158,22 +149,6 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
CompactionUtils.combineModsInCompaction(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);
- try {
- tsFileManager.writeLockWithTimeout(
- "rewrite-cross-space compaction", ACQUIRE_WRITE_LOCK_TIMEOUT);
- getWriteLockOfManager = true;
- } catch (WriteLockFailedException e) {
- // if current compaction thread couldn't get write lock
- // a WriteLockFailException will be thrown, then terminate the thread itself
- logger.error(
- "{} [Compaction] CrossSpaceCompactionTask failed to get write lock, abort the task.",
- fullStorageGroupName,
- e);
- throw new InterruptedException(
- String.format(
- "%s [Compaction] compaction abort because cannot acquire write lock",
- fullStorageGroupName));
- }
deleteOldFiles(selectedSeqTsFileResourceList);
deleteOldFiles(selectedUnSeqTsFileResourceList);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionSelector.java
index 3a11876..405e9c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionSelector.java
@@ -20,13 +20,15 @@ package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import java.util.List;
public abstract class AbstractInnerSpaceCompactionSelector extends AbstractCompactionSelector {
protected String logicalStorageGroupName;
protected String virtualStorageGroupName;
protected long timePartition;
- protected TsFileResourceList tsFileResources;
+ protected List<TsFileResource> tsFileResources;
protected boolean sequence;
protected InnerSpaceCompactionTaskFactory taskFactory;
protected TsFileManager tsFileManager;
@@ -45,9 +47,10 @@ public abstract class AbstractInnerSpaceCompactionSelector extends AbstractCompa
this.sequence = sequence;
this.taskFactory = taskFactory;
if (sequence) {
- tsFileResources = tsFileManager.getSequenceListByTimePartition(timePartition);
+ tsFileResources = tsFileManager.getSequenceListByTimePartition(timePartition).getArrayList();
} else {
- tsFileResources = tsFileManager.getUnsequenceListByTimePartition(timePartition);
+ tsFileResources =
+ tsFileManager.getUnsequenceListByTimePartition(timePartition).getArrayList();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTaskFactory.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTaskFactory.java
index cf1be5d..49ba832 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTaskFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTaskFactory.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import java.util.List;
@@ -33,7 +32,6 @@ public class InnerSpaceCompactionTaskFactory {
String virtualStorageGroup,
long timePartition,
TsFileManager tsFileManager,
- TsFileResourceList tsFileResourceList,
List<TsFileResource> selectedTsFileResourceList,
boolean sequence) {
return IoTDBDescriptor.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 7ee6244..662aab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.CompactionPriority;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionTaskFactory;
@@ -80,9 +79,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
*/
@Override
public void selectAndSubmit() {
- final CompactionPriority priority =
- IoTDBDescriptor.getInstance().getConfig().getCompactionPriority();
- tsFileResources.readLock();
PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
@@ -97,8 +93,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
}
} catch (Exception e) {
LOGGER.error("Exception occurs while selecting files", e);
- } finally {
- tsFileResources.readUnlock();
}
}
@@ -178,7 +172,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
virtualStorageGroupName,
timePartition,
tsFileManager,
- tsFileResources,
selectedFileList,
sequence);
return CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 0521005..8dd79f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionExceptionHandler;
@@ -29,7 +28,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
-import org.apache.iotdb.db.exception.WriteLockFailedException;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -90,9 +88,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
return;
}
long startTime = System.currentTimeMillis();
- boolean getWriteLockOfManager = false;
- final long ACQUIRE_WRITE_LOCK_TIMEOUT =
- IoTDBDescriptor.getInstance().getConfig().getCompactionAcquireWriteLockTimeout();
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
// Here is tmpTargetFile, which is xxx.target
@@ -160,27 +155,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
InnerSpaceCompactionUtils.combineModsInCompaction(
selectedTsFileResourceList, targetTsFileResource);
- LOGGER.info(
- "{} [Compaction] Get the write lock of files, try to get the write lock of TsFileResourceList",
- fullStorageGroupName);
- // get write lock for TsFileResource list with timeout
- try {
- tsFileManager.writeLockWithTimeout("size-tired compaction", ACQUIRE_WRITE_LOCK_TIMEOUT);
- getWriteLockOfManager = true;
- } catch (WriteLockFailedException e) {
- // if current compaction thread couldn't get write lock
- // a WriteLockFailException will be thrown, then terminate the thread itself
- LOGGER.warn(
- "{} [SizeTiredCompactionTask] failed to get write lock, abort the task and delete the target file {}",
- fullStorageGroupName,
- targetTsFileResource.getTsFile(),
- e);
- throw new InterruptedException(
- String.format(
- "%s [Compaction] compaction abort because cannot acquire write lock",
- fullStorageGroupName));
- }
-
if (targetTsFileResource.getTsFile().length()
< TSFileConfig.MAGIC_STRING.getBytes().length * 2L + Byte.BYTES) {
// the file size is smaller than magic string and version number
@@ -235,9 +209,6 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
tsFileResourceList);
} finally {
releaseFileLocksAndResetMergingStatus(true);
- if (getWriteLockOfManager) {
- tsFileManager.writeUnlock();
- }
}
}
@@ -255,29 +226,25 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
@Override
public boolean checkValidAndSetMerging() {
- tsFileResourceList.readLock();
- try {
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- TsFileResource resource = selectedTsFileResourceList.get(i);
- resource.readLock();
- isHoldingReadLock[i] = true;
- if (resource.isCompacting() | !resource.isClosed()
- || !resource.getTsFile().exists()
- || resource.isDeleted()) {
- // this source file cannot be compacted
- // release the lock of locked files, and return
- releaseFileLocksAndResetMergingStatus(false);
- return false;
- }
- }
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ TsFileResource resource = selectedTsFileResourceList.get(i);
- for (TsFileResource resource : selectedTsFileResourceList) {
- resource.setCompacting(true);
+ if (resource.isCompacting() | !resource.isClosed()
+ || !resource.getTsFile().exists()
+ || resource.isDeleted()) {
+ // this source file cannot be compacted
+ // release the lock of locked files, and return
+ releaseFileLocksAndResetMergingStatus(false);
+ return false;
}
- return true;
- } finally {
- tsFileResourceList.readUnlock();
+ resource.readLock();
+ isHoldingReadLock[i] = true;
+ }
+
+ for (TsFileResource resource : selectedTsFileResourceList) {
+ resource.setCompacting(true);
}
+ return true;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index c72311e..92b7204 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -67,6 +67,8 @@ public class TsFileManager {
}
public List<TsFileResource> getTsFileList(boolean sequence) {
+ // the iteration of ConcurrentSkipListMap is not concurrent secure
+ // so we must add read lock here
readLock();
try {
List<TsFileResource> allResources = new ArrayList<>();
@@ -98,7 +100,7 @@ public class TsFileManager {
}
public void remove(TsFileResource tsFileResource, boolean sequence) {
- writeLock("remove");
+ readLock();
try {
Map<Long, TsFileResourceList> selectedMap = sequence ? sequenceFiles : unsequenceFiles;
for (Map.Entry<Long, TsFileResourceList> entry : selectedMap.entrySet()) {
@@ -109,7 +111,7 @@ public class TsFileManager {
}
}
} finally {
- writeUnlock();
+ readUnlock();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 401039c..9950f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -507,6 +507,10 @@ public class TsFileResource {
return tsFileLock.tryWriteLock();
}
+ public boolean tryReadLock() {
+ return tsFileLock.tryReadLock();
+ }
+
void doUpgrade() {
UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index 8e5aca6..6a290d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -123,13 +123,7 @@ public class CompactionSchedulerTest {
IoTDB.metaManager.clear();
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
- try {
- Thread.sleep(10_000);
- } catch (InterruptedException e) {
-
- } finally {
- CompactionClearUtils.deleteEmptyDir(new File("target"));
- }
+ CompactionClearUtils.deleteEmptyDir(new File("target"));
CompactionTaskManager.getInstance().stop();
}
@@ -138,7 +132,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
- public void test1() throws IOException, IllegalPathException {
+ public void test1() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test1");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -185,12 +179,14 @@ public class CompactionSchedulerTest {
tsFileManager.add(tsFileResource, false);
}
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
+
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -198,80 +194,44 @@ public class CompactionSchedulerTest {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
}
+
totalWaitingTime = 0;
+
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
+
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -293,7 +253,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
- public void test2() throws IOException, IllegalPathException {
+ public void test2() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test2");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -310,7 +270,7 @@ public class CompactionSchedulerTest {
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
IoTDBDescriptor.getInstance()
.getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
+ .setCrossCompactionMemoryBudget(2 * 1024 * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -344,40 +304,18 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
@@ -405,13 +343,7 @@ public class CompactionSchedulerTest {
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -433,7 +365,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
- public void test3() throws IOException, IllegalPathException {
+ public void test3() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test3");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -447,9 +379,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -479,73 +408,36 @@ public class CompactionSchedulerTest {
tsFileManager.add(tsFileResource, false);
}
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
-
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
assertEquals(100, tsFileManager.getTsFileList(false).size());
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
- if (totalWaitingTime > MAX_WAITING_TIME) {
- fail();
- break;
- }
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(1, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -567,7 +459,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=100
*/
@Test
- public void test4() throws IOException, IllegalPathException {
+ public void test4() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test4");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -611,48 +503,24 @@ public class CompactionSchedulerTest {
fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
tsFileManager.add(tsFileResource, false);
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
-
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -673,7 +541,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
- public void test5() throws IOException, IllegalPathException {
+ public void test5() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test5");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -688,9 +556,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -720,103 +585,54 @@ public class CompactionSchedulerTest {
tsFileManager.add(tsFileResource, false);
}
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "The number of sequence tsfile is {}, {} is wanted",
- tsFileManager.getTsFileList(true).size(),
- 1);
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -838,7 +654,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
- public void test6() throws IOException, IllegalPathException {
+ public void test6() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test6");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -853,9 +669,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -890,67 +703,38 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -971,7 +755,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
- public void test7() throws IOException, IllegalPathException {
+ public void test7() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test7");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -985,9 +769,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1022,68 +803,38 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 1) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
-
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
@@ -1103,7 +854,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=100
*/
@Test
- public void test8() throws IOException, IllegalPathException {
+ public void test8() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test8");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1117,9 +868,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(100);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1149,49 +897,27 @@ public class CompactionSchedulerTest {
tsFileManager.add(tsFileResource, false);
}
- while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 0) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
-
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
@@ -1211,7 +937,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
- public void test9() throws IOException, IllegalPathException {
+ public void test9() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test9");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1225,9 +951,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1262,57 +985,33 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 50) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- assertEquals(100, tsFileManager.getTsFileList(false).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 25) {
- try {
- Thread.sleep(100);
- } catch (Exception e) {
-
- }
+ Thread.sleep(100);
totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
}
}
assertTrue(tsFileManager.getTsFileList(true).size() <= 25);
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
@@ -1334,7 +1033,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
- public void test10() throws IOException, IllegalPathException {
+ public void test10() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test10");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1351,9 +1050,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1388,67 +1084,38 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 50) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 25) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -1472,7 +1139,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
- public void test11() throws IOException, IllegalPathException {
+ public void test11() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test11");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1486,9 +1153,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1523,67 +1187,38 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 50) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(false).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 25) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -1605,7 +1240,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=50 max_compaction_candidate_file_num=2
*/
@Test
- public void test12() throws IOException, IllegalPathException {
+ public void test12() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test12");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1619,9 +1254,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -1657,68 +1289,39 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 96) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -1734,145 +1337,13 @@ public class CompactionSchedulerTest {
.setMaxCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
- /**
- * enable_seq_space_compaction=true enable_unseq_space_compaction=true
- * compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
- */
- @Test
- public void test13() throws IOException, IllegalPathException {
- logger.warn("Running test13");
- boolean prevEnableSeqSpaceCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
- IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(true);
- boolean prevEnableUnseqSpaceCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
- IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
- int prevCompactionConcurrentThread =
- IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
- IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(1);
- int prevMaxCompactionCandidateFileNum =
- IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
- IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
- try {
- CompactionTaskManager.getInstance().restart();
- TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", "target");
- for (int i = 0; i < 100; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1, COMPACTION_TEST_SG);
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
- tsFileManager.add(tsFileResource, true);
- }
- for (int i = 0; i < 100; i++) {
- Set<String> fullPath = new HashSet<>(Arrays.asList(fullPaths));
- List<List<Long>> chunkPagePointsNum = new ArrayList<>();
- List<Long> pagePointsNum = new ArrayList<>();
- pagePointsNum.add(100L);
- chunkPagePointsNum.add(pagePointsNum);
- TsFileResource tsFileResource =
- CompactionFileGeneratorUtils.generateTsFileResource(false, i + 1, COMPACTION_TEST_SG);
- CompactionFileGeneratorUtils.writeTsFile(
- fullPath, chunkPagePointsNum, 100 * i + 50, tsFileResource);
- tsFileManager.add(tsFileResource, false);
- }
-
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
- long totalWaitingTime = 0;
- while (tsFileManager.getTsFileList(true).size() > 99) {
- try {
- Thread.sleep(10);
- totalWaitingTime += 10;
- if (totalWaitingTime > MAX_WAITING_TIME) {
- fail();
- break;
- }
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- assertEquals(100, tsFileManager.getTsFileList(false).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
- totalWaitingTime = 0;
- while (tsFileManager.getTsFileList(true).size() > 98) {
- try {
- Thread.sleep(10);
- totalWaitingTime += 10;
- if (totalWaitingTime > MAX_WAITING_TIME) {
- fail();
- break;
- }
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- assertEquals(100, tsFileManager.getTsFileList(false).size());
- tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setEnableUnseqSpaceCompaction(prevEnableUnseqSpaceCompaction);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setConcurrentCompactionThread(prevCompactionConcurrentThread);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setMaxCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
- }
- }
/**
* enable_seq_space_compaction=false enable_unseq_space_compaction=true
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
- public void test14() throws IOException, IllegalPathException {
+ public void test14() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test14");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -1886,9 +1357,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", "target");
@@ -1922,69 +1390,39 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 99) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- logger.warn("waiting");
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -2005,7 +1443,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
- public void test15() throws IOException, IllegalPathException {
+ public void test15() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test15");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -2019,9 +1457,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -2056,70 +1491,40 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 99) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(false).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(true).size() > 98) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(false).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
-
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableSeqSpaceCompaction(prevEnableSeqSpaceCompaction);
@@ -2139,7 +1544,7 @@ public class CompactionSchedulerTest {
* compaction_concurrent_thread=1 max_compaction_candidate_file_num=2
*/
@Test
- public void test16() throws IOException, IllegalPathException {
+ public void test16() throws IOException, IllegalPathException, InterruptedException {
logger.warn("Running test16");
boolean prevEnableSeqSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
@@ -2153,9 +1558,6 @@ public class CompactionSchedulerTest {
int prevMaxCompactionCandidateFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCompactionCandidateFileNum();
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(2);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setTargetCompactionFileSize(2L * 1024L * 1024L * 1024L);
try {
CompactionTaskManager.getInstance().restart();
@@ -2190,68 +1592,39 @@ public class CompactionSchedulerTest {
long totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 98) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
totalWaitingTime = 0;
while (tsFileManager.getTsFileList(false).size() > 96) {
try {
- Thread.sleep(10);
- totalWaitingTime += 10;
+ Thread.sleep(100);
+ totalWaitingTime += 100;
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
if (totalWaitingTime > MAX_WAITING_TIME) {
fail();
break;
}
- if (totalWaitingTime % 10_000 == 0) {
- logger.warn(
- "sequence file num is {}, unsequence file num is {}",
- tsFileManager.getTsFileList(true).size(),
- tsFileManager.getTsFileList(false).size());
- }
- if (totalWaitingTime % SCHEDULE_AGAIN_TIME == 0) {
- logger.warn("Has waited for {} s, Schedule again", totalWaitingTime / 1000);
- CompactionScheduler.scheduleCompaction(tsFileManager, 0);
- }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
assertEquals(100, tsFileManager.getTsFileList(true).size());
tsFileManager.setAllowCompaction(false);
- while (CompactionTaskManager.currentTaskNum.get() > 0) {
- try {
- Thread.sleep(10);
- } catch (Exception e) {
-
- }
- }
+ stopCompactionTaskManager();
} finally {
IoTDBDescriptor.getInstance()
.getConfig()
@@ -2267,4 +1640,15 @@ public class CompactionSchedulerTest {
.setMaxCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
}
}
+
+ public void stopCompactionTaskManager() {
+ CompactionTaskManager.getInstance().clearCandidateQueue();
+ while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+
+ }
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index e1670eb..09603cc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -46,6 +46,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
if (tempSGDir.exists()) {
FileUtils.deleteDirectory(tempSGDir);
}
+ CompactionTaskManager.getInstance().restart();
Assert.assertTrue(tempSGDir.mkdirs());
super.setUp();
}
@@ -62,7 +63,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
SizeTieredCompactionTask task2 =
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
+ seqResources.get(0).readLock();
CompactionTaskManager manager = CompactionTaskManager.getInstance();
try {
Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
@@ -72,7 +73,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(manager.getTotalTaskCount(), 1);
manager.submitTaskFromTaskQueue();
} finally {
- tsFileManager.writeUnlock();
+ seqResources.get(0).readUnlock();
}
Thread.sleep(5000);
Assert.assertEquals(0, manager.getTotalTaskCount());
@@ -101,7 +102,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
SizeTieredCompactionTask task2 =
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
+ seqResources.get(0).readLock();
try {
CompactionTaskManager manager = CompactionTaskManager.getInstance();
manager.addTaskToWaitingQueue(task1);
@@ -111,7 +112,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(manager.getExecutingTaskCount(), 1);
Assert.assertFalse(manager.addTaskToWaitingQueue(task2));
} finally {
- tsFileManager.writeUnlock();
+ seqResources.get(0).readUnlock();
}
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) {
@@ -144,11 +145,12 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
while (manager.getTotalTaskCount() > 0) {
Thread.sleep(10);
}
- tsFileManager.writeLock("test");
+ seqResources.get(0).readLock();
// an invalid task can be submitted to waiting queue, but should not be submitted to thread pool
Assert.assertTrue(manager.addTaskToWaitingQueue(task2));
manager.submitTaskFromTaskQueue();
Assert.assertEquals(manager.getExecutingTaskCount(), 0);
+ seqResources.get(0).readUnlock();
long waitingTime = 0;
while (manager.getRunningCompactionTaskList().size() > 0) {
Thread.sleep(100);
@@ -172,7 +174,8 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
new SizeTieredCompactionTask(
"root.compactionTest", "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
CompactionTaskManager manager = CompactionTaskManager.getInstance();
- tsFileManager.writeLock("test");
+ manager.restart();
+ seqResources.get(0).readLock();
try {
manager.addTaskToWaitingQueue(task1);
manager.submitTaskFromTaskQueue();
@@ -182,7 +185,7 @@ public class CompactionTaskManagerTest extends InnerCompactionTest {
Assert.assertEquals(1, runningList.size());
Assert.assertTrue(runningList.contains(task1));
} finally {
- tsFileManager.writeUnlock();
+ seqResources.get(0).readUnlock();
}
// after execution, task should remove itself from running list
Thread.sleep(5000);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
index de88492..652591d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
@@ -102,8 +102,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager,
- 0);
+ tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
@@ -172,8 +171,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager,
- 0);
+ tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
@@ -243,8 +241,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager,
- 0);
+ tsFileManager);
// all source file should not exist
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.getTsFile().exists());
@@ -332,8 +329,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager,
- 0);
+ tsFileManager);
// All source file should not exist. All compaction mods file and old mods file of each source
// file should not exist
for (TsFileResource resource : seqResources) {
@@ -435,8 +431,7 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
targetResources,
seqResources,
unseqResources,
- tsFileManager,
- 0);
+ tsFileManager);
// all source file should still exist
for (TsFileResource resource : seqResources) {
Assert.assertTrue(resource.getTsFile().exists());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 2d52737..ffa05b2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -84,8 +84,11 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMaxCompactionCandidateFileNum(4);
IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(1000000);
createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+ registerTimeseriesInMManger(2, 3, false);
createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+ registerTimeseriesInMManger(3, 5, false);
createFiles(2, 5, 5, 50, 600, 800, 50, 50, false, true);
+ registerTimeseriesInMManger(5, 5, false);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
deleted file mode 100644
index fdd4713..0000000
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionHandleExceptionTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
-import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SizeTieredCompactionHandleExceptionTest extends AbstractInnerSpaceCompactionTest {
- @Before
- public void setUp() throws IOException, MetadataException, WriteProcessException {
- this.seqFileNum = 10;
- super.setUp();
- }
-
- @After
- public void tearDown() throws StorageEngineException, IOException {
- new CompactionConfigRestorer().restoreCompactionConfig();
- super.tearDown();
- }
-
- @Test
- public void testHandleExceptionTargetCompleteAndSourceExists() {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(2_000);
- try {
- tsFileManager.addAll(seqResources, true);
- tsFileManager.addAll(unseqResources, false);
- SizeTieredCompactionTask task =
- new SizeTieredCompactionTask(
- COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
- try {
- new Thread(
- () -> {
- try {
- task.call();
- } catch (Exception e) {
-
- }
- })
- .start();
- Thread.sleep(4_000);
- } catch (Exception e) {
- } finally {
- tsFileManager.writeUnlock();
- }
- Assert.assertTrue(tsFileManager.isAllowCompaction());
- Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
- } finally {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000);
- }
- }
-
- @Test
- public void testHandleExceptionTargetNotCompleteAndSourceNotExists() {
- tsFileManager.addAll(seqResources, true);
- tsFileManager.addAll(unseqResources, false);
- SizeTieredCompactionTask task =
- new SizeTieredCompactionTask(
- COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
- try {
- seqResources.get(seqResources.size() - 1).remove();
- new Thread(
- () -> {
- try {
- task.call();
- } catch (Exception e) {
-
- }
- })
- .start();
- Thread.sleep(5_000);
- } catch (Exception e) {
- } finally {
- tsFileManager.writeUnlock();
- }
- Assert.assertFalse(tsFileManager.isAllowCompaction());
- Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
- }
-
- @Test
- public void testHandleExceptionTargetCompleteAndSourceNotExists() {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(10_000);
- try {
- tsFileManager.addAll(seqResources, true);
- tsFileManager.addAll(unseqResources, false);
- SizeTieredCompactionTask task =
- new SizeTieredCompactionTask(
- COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
- try {
- new Thread(
- () -> {
- try {
- task.call();
- } catch (Exception e) {
-
- }
- })
- .start();
- Thread.sleep(8_000);
- seqResources.get(0).remove();
- tsFileManager.getTsFileList(true).remove(seqResources.get(0));
- Thread.sleep(3_000);
- } catch (Exception e) {
- } finally {
- tsFileManager.writeUnlock();
- }
- Assert.assertTrue(tsFileManager.isAllowCompaction());
- Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
- } finally {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000L);
- }
- }
-
- @Test
- public void testHandleExceptionTargetNotCompleteAndSourceExists() {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(10_000L);
- try {
- tsFileManager.addAll(seqResources, true);
- tsFileManager.addAll(unseqResources, false);
- SizeTieredCompactionTask task =
- new SizeTieredCompactionTask(
- COMPACTION_TEST_SG, "0", 0, tsFileManager, seqResources, true, new AtomicInteger(0));
- tsFileManager.writeLock("test");
- try {
- new Thread(
- () -> {
- try {
- task.call();
- } catch (Exception e) {
- }
- })
- .start();
- Thread.sleep(8_000);
- File targetFile =
- TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true)
- .getTsFile();
- FileChannel channel = new FileOutputStream(targetFile, true).getChannel();
- channel.truncate(10);
- channel.close();
- Thread.sleep(3_000);
- } catch (Exception e) {
- } finally {
- tsFileManager.writeUnlock();
- }
- Assert.assertTrue(tsFileManager.isAllowCompaction());
- Assert.assertEquals(10, tsFileManager.getTsFileList(true).size());
- } finally {
- IoTDBDescriptor.getInstance().getConfig().setCompactionAcquireWriteLockTimeout(60_000L);
- }
- }
-}