You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/27 11:23:13 UTC
[iotdb] branch rel/0.11 updated: [ref/0.11] change merge recover to
async (#2327)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 652154a [ref/0.11] change merge recover to async (#2327)
652154a is described below
commit 652154af31f062c57ef92d212c71679aac8673df
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sun Dec 27 19:22:49 2020 +0800
[ref/0.11] change merge recover to async (#2327)
---
.../compaction/CompactionMergeTaskPoolManager.java | 3 +-
.../db/engine/compaction/TsFileManagement.java | 15 ++++++
.../engine/storagegroup/StorageGroupProcessor.java | 56 ++++++++++++++--------
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
4 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 830ea56..3473d54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
@@ -112,7 +111,7 @@ public class CompactionMergeTaskPoolManager implements IService {
return ServiceType.COMPACTION_SERVICE;
}
- public void submitTask(CompactionMergeTask compactionMergeTask)
+ public void submitTask(Runnable compactionMergeTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
pool.submit(compactionMergeTask);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index d10b4c1..f7f68c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -178,6 +178,21 @@ public abstract class TsFileManagement {
}
}
+ public class CompactionRecoverTask implements Runnable {
+
+ private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
+
+ public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+ this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
+ }
+
+ @Override
+ public void run() {
+ recover();
+ closeCompactionMergeCallBack.call();
+ }
+ }
+
public void merge(boolean fullMerge, List<TsFileResource> seqMergeList,
List<TsFileResource> unSeqMergeList, long dataTTL) {
if (isUnseqMerging) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..a91661b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -335,8 +335,7 @@ public class StorageGroupProcessor {
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
- tsFileManagement.recover();
-
+ recoverCompaction();
for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
@@ -372,6 +371,24 @@ public class StorageGroupProcessor {
}
}
+ private void recoverCompaction() {
+ if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
+ try {
+ CompactionMergeTaskPoolManager.getInstance()
+ .submitTask(
+ tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+ } catch (RejectedExecutionException e) {
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
+ }
+ } else {
+ logger.error("{} compaction pool not started ,recover failed",
+ storageGroupName);
+ }
+ }
+
private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
if (fileVersion > oldVersion) {
@@ -583,8 +600,6 @@ public class StorageGroupProcessor {
continue;
}
-
-
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
@@ -810,11 +825,11 @@ public class StorageGroupProcessor {
* inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
@@ -974,9 +989,9 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1040,7 +1055,8 @@ public class StorageGroupProcessor {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
versionController, this::closeUnsealedTsFileProcessorCallBack,
- this::updateLatestFlushTimeCallback, true, partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+ this::updateLatestFlushTimeCallback, true,
+ partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -1052,7 +1068,8 @@ public class StorageGroupProcessor {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
versionController, this::closeUnsealedTsFileProcessorCallBack,
- this::unsequenceFlushCallback, false, partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+ this::unsequenceFlushCallback, false,
+ partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -1490,10 +1507,9 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param path the timeseries path of the to be deleted.
+ * @param path the timeseries path of the to be deleted.
* @param startTime the startTime of delete range.
- * @param endTime the endTime of delete range.
- * @param planIndex
+ * @param endTime the endTime of delete range.
*/
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws IOException {
@@ -2112,9 +2128,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
+ * @param tsfileName origin tsfile name
* @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
- * 1]
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2178,8 +2194,8 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index f56e7ca..dc05c1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -106,6 +106,8 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(Measurement.INSTANCE);
registerManager.register(TVListAllocator.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
+ registerManager.register(MergeManager.getINSTANCE());
+ registerManager.register(CompactionMergeTaskPoolManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
@@ -138,8 +140,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(SyncServerManager.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
- registerManager.register(MergeManager.getINSTANCE());
- registerManager.register(CompactionMergeTaskPoolManager.getInstance());
logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
}