You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/09 15:47:40 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] Avoid Compaction
blocking Flush (Fix CI not stop problem) (#3695)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new e9aea37 [To rel/0.12] Avoid Compaction blocking Flush (Fix CI not stop problem) (#3695)
e9aea37 is described below
commit e9aea37a0f8e00b5455a664a2e08ee58022a9f80
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Mon Aug 9 23:47:12 2021 +0800
[To rel/0.12] Avoid Compaction blocking Flush (Fix CI not stop problem) (#3695)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../compaction/CompactionMergeTaskPoolManager.java | 41 +++-------
.../db/engine/compaction/TsFileManagement.java | 32 ++++----
.../level/LevelCompactionTsFileManagement.java | 12 ++-
.../iotdb/db/engine/merge/manage/MergeManager.java | 16 ----
.../merge/task/CompactionMergeRecoverTask.java | 91 ++++++++++++++++++++++
.../iotdb/db/engine/merge/task/MergeFileTask.java | 8 +-
.../db/engine/merge/task/RecoverMergeTask.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 87 ++++++---------------
.../version/SimpleFileVersionController.java | 2 +-
.../db/integration/IoTDBAutoCreateSchemaIT.java | 4 +-
.../db/integration/IoTDBCreateStorageGroupIT.java | 2 +-
.../db/integration/IoTDBCreateTimeseriesIT.java | 4 +-
.../db/integration/IoTDBNewTsFileCompactionIT.java | 58 +++++++++++++-
.../aggregation/IoTDBAggregationSmallDataIT.java | 5 +-
.../apache/iotdb/db/writelog/PerformanceTest.java | 2 +-
17 files changed, 227 insertions(+), 147 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 999b240..7e7be3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -384,7 +384,7 @@ public class IoTDBConfig {
private int queryTimeoutThreshold = 60000;
/** compaction interval in ms */
- private long compactionInterval = 30000;
+ private long compactionInterval = 10000;
/** Replace implementation class of JDBC service */
private String rpcImplClassName = TSServiceImpl.class.getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b4f79d3..96933e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -284,9 +284,7 @@ public class StorageEngine implements IService {
storageGroup.getFullPath());
} catch (Exception e) {
logger.error(
- "meet error when recovering storage group: {}",
- storageGroup.getFullPath(),
- e);
+ "meet error when recovered storage group: {}", storageGroup.getFullPath(), e);
}
return null;
}));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index e347300..cdbf180 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -37,11 +37,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -53,8 +51,7 @@ public class CompactionMergeTaskPoolManager implements IService {
LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
private static final CompactionMergeTaskPoolManager INSTANCE =
new CompactionMergeTaskPoolManager();
- private ScheduledExecutorService scheduledPool;
- private ExecutorService pool;
+ private ScheduledThreadPoolExecutor pool;
private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -68,13 +65,10 @@ public class CompactionMergeTaskPoolManager implements IService {
public void start() {
if (pool == null) {
this.pool =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
- ThreadName.COMPACTION_SERVICE.getName());
- this.scheduledPool =
- IoTDBThreadPoolFactory.newScheduledThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
- ThreadName.COMPACTION_SERVICE.getName());
+ (ScheduledThreadPoolExecutor)
+ IoTDBThreadPoolFactory.newScheduledThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+ ThreadName.COMPACTION_SERVICE.getName());
}
logger.info("Compaction task manager started.");
}
@@ -83,7 +77,6 @@ public class CompactionMergeTaskPoolManager implements IService {
public void stop() {
if (pool != null) {
pool.shutdownNow();
- scheduledPool.shutdownNow();
logger.info("Waiting for task pool to shut down");
waitTermination();
storageGroupTasks.clear();
@@ -93,7 +86,6 @@ public class CompactionMergeTaskPoolManager implements IService {
@Override
public void waitAndStop(long milliseconds) {
if (pool != null) {
- awaitTermination(scheduledPool, milliseconds);
awaitTermination(pool, milliseconds);
logger.info("Waiting for task pool to shut down");
waitTermination();
@@ -149,7 +141,6 @@ public class CompactionMergeTaskPoolManager implements IService {
}
}
pool = null;
- scheduledPool = null;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}
@@ -195,25 +186,13 @@ public class CompactionMergeTaskPoolManager implements IService {
}
public void init(Runnable function) {
- scheduledPool.scheduleWithFixedDelay(
+ pool.scheduleWithFixedDelay(
function, 1000, config.getCompactionInterval(), TimeUnit.MILLISECONDS);
}
- public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
- throws RejectedExecutionException {
- if (pool != null && !pool.isTerminated()) {
- String storageGroup = storageGroupCompactionTask.getStorageGroupName();
- boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
- if (isCompacting) {
- return;
- }
- storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
- sgCompactionStatus.put(storageGroup, true);
- Future<Void> future = pool.submit(storageGroupCompactionTask);
- storageGroupTasks
- .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
- .add(future);
- }
+ @TestOnly
+ public synchronized int getCompactionTaskNum() {
+ return pool.getActiveCount();
}
public boolean isTerminated() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index dde54a7..838006c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -63,14 +63,13 @@ public abstract class TsFileManagement {
public volatile boolean isUnseqMerging = false;
public volatile boolean isSeqMerging = false;
+ public volatile boolean recovered = false;
/**
* This is the modification file of the result of the current merge. Because the merged file may
* be invisible at this moment, without this, deletion/update during merge could be lost.
*/
public ModificationFile mergingModification;
- private long mergeStartTime;
-
/** whether execute merge chunk in this task */
protected boolean isMergeExecutedInCurrentTask = false;
@@ -258,7 +257,7 @@ public abstract class TsFileManagement {
tsFileResource.setMerging(true);
}
- mergeStartTime = System.currentTimeMillis();
+ long mergeStartTime = System.currentTimeMillis();
MergeTask mergeTask =
new MergeTask(
mergeResource,
@@ -279,17 +278,6 @@ public abstract class TsFileManagement {
mergeFiles[0].size(),
mergeFiles[1].size());
}
- // wait until unseq merge has finished
- while (isUnseqMerging) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- logger.error("{} [Compaction] shutdown", storageGroupName, e);
- Thread.currentThread().interrupt();
- return false;
- }
- }
- return true;
} catch (MergeException | IOException e) {
logger.error("{} cannot select file for merge", storageGroupName, e);
return false;
@@ -297,6 +285,17 @@ public abstract class TsFileManagement {
} finally {
writeUnlock();
}
+ // wait until unseq merge has finished
+ while (isUnseqMerging) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("{} [Compaction] shutdown", storageGroupName, e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ return true;
}
private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
@@ -423,7 +422,10 @@ public abstract class TsFileManagement {
File mergedFile =
FSFactoryProducer.getFSFactory().getFile(seqFile.getTsFilePath() + MERGE_SUFFIX);
if (mergedFile.exists()) {
- mergedFile.delete();
+ boolean deletionSuccess = mergedFile.delete();
+ if (!deletionSuccess) {
+ logger.warn("fail to delete {}", mergedFile);
+ }
}
updateMergeModification(seqFile);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5f260f3..52141cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -444,6 +444,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (fullMerge) {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
TsFileResource targetTsFileResource = getRecoverTsFileResource(targetFile, isSeq);
+ if (targetTsFileResource == null) {
+ targetTsFileResource = getTsFileResource(targetFile, isSeq);
+ if (targetTsFileResource == null) {
+ logger.warn("get null targetTsFileResource");
+ return;
+ }
+ }
long timePartition = targetTsFileResource.getTimePartition();
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
// if not complete compaction, resume merge
@@ -612,6 +619,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@Override
protected void merge(long timePartition) {
+ // compacting sequence file in one time partition
isMergeExecutedInCurrentTask =
merge(
forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
@@ -888,7 +896,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (targetFile.exists()) {
logger.error(
"{} restore delete target file {} ", storageGroupName, targetFile.getName());
- targetFile.delete();
+ if (!targetFile.delete()) {
+ logger.warn("fail to delete {}", targetFile);
+ }
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..aef6e2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -68,7 +68,6 @@ public class MergeManager implements IService, MergeManagerMBean {
private AtomicInteger threadCnt = new AtomicInteger();
private ThreadPoolExecutor mergeTaskPool;
private ThreadPoolExecutor mergeChunkSubTaskPool;
- private ScheduledExecutorService timedMergeThreadPool;
private ScheduledExecutorService taskCleanerThreadPool;
private Map<String, Set<MergeFuture>> storageGroupMainTasks = new ConcurrentHashMap<>();
@@ -143,13 +142,6 @@ public class MergeManager implements IService, MergeManagerMBean {
new MergeThreadPool(
threadNum * chunkSubThreadNum,
r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement()));
- long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
- if (mergeInterval > 0) {
- timedMergeThreadPool =
- Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "TimedMergeThread"));
- timedMergeThreadPool.scheduleAtFixedRate(
- this::mergeAll, mergeInterval, mergeInterval, TimeUnit.SECONDS);
- }
taskCleanerThreadPool =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MergeTaskCleaner"));
@@ -161,10 +153,6 @@ public class MergeManager implements IService, MergeManagerMBean {
@Override
public void stop() {
if (mergeTaskPool != null) {
- if (timedMergeThreadPool != null) {
- timedMergeThreadPool.shutdownNow();
- timedMergeThreadPool = null;
- }
taskCleanerThreadPool.shutdownNow();
taskCleanerThreadPool = null;
mergeTaskPool.shutdownNow();
@@ -199,10 +187,6 @@ public class MergeManager implements IService, MergeManagerMBean {
@Override
public void waitAndStop(long milliseconds) {
if (mergeTaskPool != null) {
- if (timedMergeThreadPool != null) {
- awaitTermination(timedMergeThreadPool, milliseconds);
- timedMergeThreadPool = null;
- }
awaitTermination(taskCleanerThreadPool, milliseconds);
taskCleanerThreadPool = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
new file mode 100644
index 0000000..8385934
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class CompactionMergeRecoverTask implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CompactionMergeRecoverTask.class);
+
+ private TsFileManagement.CompactionRecoverTask compactionRecoverTask;
+ private RecoverMergeTask recoverMergeTask;
+ private TsFileManagement tsFileManagement;
+ private String storageGroupSysDir;
+ private String storageGroupName;
+
+ public CompactionMergeRecoverTask(
+ TsFileManagement tsFileManagement,
+ List<TsFileResource> seqFiles,
+ List<TsFileResource> unseqFiles,
+ String storageGroupSysDir,
+ MergeCallback callback,
+ String taskName,
+ boolean fullMerge,
+ String storageGroupName,
+ StorageGroupProcessor.CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ this.tsFileManagement = tsFileManagement;
+ this.compactionRecoverTask =
+ this.tsFileManagement.new CompactionRecoverTask(closeCompactionMergeCallBack);
+ this.storageGroupSysDir = storageGroupSysDir;
+ this.storageGroupName = storageGroupName;
+ this.recoverMergeTask =
+ new RecoverMergeTask(
+ seqFiles,
+ unseqFiles,
+ storageGroupSysDir,
+ callback,
+ taskName,
+ fullMerge,
+ storageGroupName);
+ }
+
+ @Override
+ public void run() {
+ tsFileManagement.recovered = false;
+ try {
+ recoverMergeTask.recoverMerge(
+ IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+ File mergingMods =
+ SystemFileFactory.INSTANCE.getFile(
+ storageGroupSysDir, StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME);
+ if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
+ mergingMods.delete();
+ }
+ } catch (MetadataException | IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ compactionRecoverTask.call();
+ tsFileManagement.recovered = true;
+ logger.info("{} Compaction recover finish", storageGroupName);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 0c4241b..34735c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -207,7 +207,9 @@ public class MergeFileTask {
mergeLogger.logFileMergeEnd();
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
- newFileWriter.getFile().delete();
+ if (!newFileWriter.getFile().delete()) {
+ logger.warn("fail to delete {}", newFileWriter.getFile());
+ }
// change tsFile name
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(seqFile.getTsFile(), nextMergeVersionFile);
@@ -358,7 +360,9 @@ public class MergeFileTask {
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
// change tsFile name
- seqFile.getTsFile().delete();
+ if (!seqFile.getTsFile().delete()) {
+ logger.warn("fail to delete {}", seqFile.getTsFile());
+ }
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
fsFactory.moveFile(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 32cd897..976c374 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -205,11 +205,11 @@ public class RecoverMergeTask extends MergeTask {
// scan the metadata to compute how many chunks are merged/unmerged so at last we can decide to
// move the merged chunks or the unmerged chunks
private void recoverChunkCounts() throws IOException {
- logger.info("{} recovering chunk counts", taskName);
+ logger.info("{} recovered chunk counts", taskName);
int fileCnt = 1;
for (TsFileResource tsFileResource : resource.getSeqFiles()) {
logger.info(
- "{} recovering {} {}/{}",
+ "{} recovered {} {}/{}",
taskName,
tsFileResource.getTsFile().getName(),
fileCnt,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 789092a..840a482 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
-import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.task.CompactionMergeRecoverTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -103,7 +103,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -487,27 +486,21 @@ public class StorageGroupProcessor {
this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverMergeTask recoverMergeTask =
- new RecoverMergeTask(
+ CompactionMergeRecoverTask recoverTask =
+ new CompactionMergeRecoverTask(
+ tsFileManagement,
new ArrayList<>(tsFileManagement.getTsFileList(true)),
tsFileManagement.getTsFileList(false),
storageGroupSysDir.getPath(),
tsFileManagement::mergeEndAction,
taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
- logicalStorageGroupName);
- logger.info(
- "{} - {} a RecoverMergeTask {} starts...",
- logicalStorageGroupName,
- virtualStorageGroupId,
- taskName);
- recoverMergeTask.recoverMerge(
- IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
- if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
- mergingMods.delete();
- }
- recoverCompaction();
- } catch (IOException | MetadataException e) {
+ logicalStorageGroupName,
+ this::closeCompactionRecoverCallBack);
+
+ new Thread(recoverTask).start();
+ logger.info("submit a compaction merge recover task");
+ } catch (IOException e) {
throw new StorageGroupProcessorException(e);
}
}
@@ -529,35 +522,6 @@ public class StorageGroupProcessor {
}
}
- private void recoverCompaction() {
- if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- == CompactionStrategy.NO_COMPACTION) {
- return;
- }
- if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
- logger.info(
- "{} - {} submit a compaction recover merge task",
- logicalStorageGroupName,
- virtualStorageGroupId);
- try {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
- } catch (RejectedExecutionException e) {
- this.closeCompactionRecoverCallBack(false, 0);
- logger.error(
- "{} - {} compaction submit task failed",
- logicalStorageGroupName,
- virtualStorageGroupId,
- e);
- }
- } else {
- logger.error(
- "{} compaction pool not started ,recover failed",
- logicalStorageGroupName + "-" + virtualStorageGroupId);
- }
- }
-
private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
if (fileVersion > oldVersion) {
@@ -626,7 +590,7 @@ public class StorageGroupProcessor {
}
// old version
- // some TsFileResource may be being persisted when the system crashed, try recovering such
+ // some TsFileResource may be being persisted when the system crashed, try recovered such
// resources
continueFailedRenames(fileFolder, TEMP_SUFFIX);
@@ -640,7 +604,7 @@ public class StorageGroupProcessor {
if (!partitionFolder.isDirectory()) {
logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
} else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
- // some TsFileResource may be being persisted when the system crashed, try recovering
+ // some TsFileResource may be being persisted when the system crashed, try recovered
// such
// resources
continueFailedRenames(partitionFolder, TEMP_SUFFIX);
@@ -1956,14 +1920,6 @@ public class StorageGroupProcessor {
logger.info(
"signal closing storage group condition in {}",
logicalStorageGroupName + "-" + virtualStorageGroupId);
-
- if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
- == CompactionStrategy.LEVEL_COMPACTION) {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(
- new CompactionOnePartitionTask(
- logicalStorageGroupName, tsFileProcessor.getTimeRangeId()));
- }
}
public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
@@ -2005,14 +1961,12 @@ public class StorageGroupProcessor {
return;
}
CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(logicalStorageGroupName);
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- logger.info(
- "{}-{} recover finished, submit continuous compaction task",
- logicalStorageGroupName,
- virtualStorageGroupId);
+ logger.info(
+ "{}-{} recover finished, submit scheduled compaction task",
+ logicalStorageGroupName,
+ virtualStorageGroupId);
- CompactionMergeTaskPoolManager.getInstance().init(this::merge);
- }
+ CompactionMergeTaskPoolManager.getInstance().init(this::merge);
}
/** close compaction merge callback, to release some locks */
@@ -2119,9 +2073,12 @@ public class StorageGroupProcessor {
}
public void merge() {
+ if (!tsFileManagement.recovered) {
+ // doing recovered task
+ return;
+ }
if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
- CompactionMergeTaskPoolManager.getInstance()
- .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+ new CompactionAllPartitionTask(logicalStorageGroupName).call();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 0a73f16..aafe0ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -36,7 +36,7 @@ public class SimpleFileVersionController implements VersionController {
public static final String UPGRADE_DIR = "upgrade";
/**
* Every time currVersion - prevVersion >= saveInterval, currVersion is persisted and prevVersion
- * is set to currVersion. When recovering from file, the version number is automatically increased
+ * is set to currVersion. When recovered from file, the version number is automatically increased
* by saveInterval to avoid conflicts.
*/
private static long saveInterval = 100;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
index abac508..912fa33 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
@@ -174,7 +174,7 @@ public class IoTDBAutoCreateSchemaIT {
EnvironmentUtils.stopDaemon();
setUp();
- // ensure that insert data in cache is right after recovering.
+ // ensure that insert data in cache is right after recovered.
insertAutoCreate1Tool();
}
@@ -222,7 +222,7 @@ public class IoTDBAutoCreateSchemaIT {
EnvironmentUtils.stopDaemon();
setUp();
- // ensure that storage group in cache is right after recovering.
+ // ensure that storage group in cache is right after recovered.
InsertAutoCreate2Tool(storageGroup, timeSeriesPrefix);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
index 28d75d1..4333199 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
@@ -75,7 +75,7 @@ public class IoTDBCreateStorageGroupIT {
EnvironmentUtils.stopDaemon();
setUp();
- // ensure StorageGroup in cache is right after recovering.
+ // ensure StorageGroup in cache is right after recovered.
createStorageGroupTool(storageGroups);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
index 5174419..ba076bb 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
@@ -82,7 +82,7 @@ public class IoTDBCreateTimeseriesIT {
EnvironmentUtils.stopDaemon();
setUp();
- // ensure timeseries in cache is right after recovering.
+ // ensure timeseries in cache is right after recovered.
createTimeSeries1Tool(timeSeriesArray);
}
@@ -129,7 +129,7 @@ public class IoTDBCreateTimeseriesIT {
EnvironmentUtils.stopDaemon();
setUp();
- // ensure storage group in cache is right after recovering.
+ // ensure storage group in cache is right after recovered.
createTimeSeries2Tool(storageGroup);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
index c327ef1..7bc0c81 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
@@ -32,6 +33,8 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -46,6 +49,8 @@ import static org.junit.Assert.fail;
public class IoTDBNewTsFileCompactionIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBNewTsFileCompactionIT.class);
+
private int prevSeqLevelFileNum;
private int prevSeqLevelNum;
private int prevMergePagePointNumber;
@@ -53,7 +58,7 @@ public class IoTDBNewTsFileCompactionIT {
private CompactionStrategy preCompactionStrategy;
private PartialPath storageGroupPath;
// the unit is ns
- private static final long MAX_WAIT_TIME_FOR_MERGE = Long.MAX_VALUE;
+ private static final long MAX_WAIT_TIME_FOR_MERGE = 1L * 60L * 1000L * 1000L * 1000L;
private static final float FLOAT_DELTA = 0.00001f;
@Before
@@ -74,6 +79,7 @@ public class IoTDBNewTsFileCompactionIT {
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setCompactionThreadNum(10);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -127,7 +133,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -181,7 +189,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -234,7 +244,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -293,7 +305,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -347,7 +361,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -406,7 +422,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -461,7 +479,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -523,7 +543,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -578,7 +600,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -635,7 +659,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -697,7 +723,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -759,7 +787,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -819,7 +849,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -882,7 +914,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -945,7 +979,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -1011,7 +1047,9 @@ public class IoTDBNewTsFileCompactionIT {
statement.execute("INSERT INTO root.sg1.d1(time,s1) values(8, 8)");
statement.execute("FLUSH");
+ LOGGER.warn("Waiting for merge to finish");
assertTrue(waitForMergeFinish());
+ LOGGER.warn("Merge Finish");
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -1043,14 +1081,32 @@ public class IoTDBNewTsFileCompactionIT {
(LevelCompactionTsFileManagement) storageGroupProcessor.getTsFileManagement();
long startTime = System.nanoTime();
+ long intervalTime = startTime;
// get the size of level 1's tsfile list to judge whether merge is finished
while (tsFileManagement.getSequenceTsFileResources().get(0L).size() < 2
|| tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() != 1) {
TimeUnit.MILLISECONDS.sleep(100);
// wait too long, just break
if ((System.nanoTime() - startTime) >= MAX_WAIT_TIME_FOR_MERGE) {
+ LOGGER.error("Unable to wait for compaction finish");
+ fail();
break;
}
+ if ((System.nanoTime() - intervalTime) >= 20L * 1000L * 1000L * 1000L) {
+ intervalTime = System.nanoTime();
+ LOGGER.warn(
+ "The number of tsfile level: {}",
+ tsFileManagement.getSequenceTsFileResources().get(0L).size());
+ LOGGER.warn(
+ "The number of tsfile in level 0: {}",
+ tsFileManagement.getSequenceTsFileResources().get(0L).get(0).size());
+ LOGGER.warn(
+ "The number of tsfile in level 1: {}",
+ tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size());
+ LOGGER.warn(
+ "The number of current compaction task num {}",
+ CompactionMergeTaskPoolManager.getInstance().getCompactionTaskNum());
+ }
}
return tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() == 1;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
index a077104..cca305d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
@@ -225,9 +225,8 @@ public class IoTDBAggregationSmallDataIT {
fail();
} catch (IoTDBSQLException e) {
Assert.assertTrue(
- e.toString()
- .contains(
- "500: [INTERNAL_SERVER_ERROR] Exception occurred while executing executeStatement. Binary statistics does not support: max"));
+ e.toString().contains("500: [INTERNAL_SERVER_ERROR] Exception occurred while executing")
+ && e.toString().contains("Binary statistics does not support: max"));
}
boolean hasResultSet =
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index 6a1df39..e726496 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -205,7 +205,7 @@ public class PerformanceTest {
logNode.forceSync();
long time = System.currentTimeMillis();
System.out.println(
- 3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovering ");
+ 3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovered ");
} finally {
ByteBuffer[] array = logNode.delete();
for (ByteBuffer byteBuffer : array) {