You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/17 16:14:12 UTC
[iotdb] branch master updated: [IOTDB-2162] Simplify the recovery merge process (#4575)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5106d [IOTDB-2162] Simplify the recovery merge process (#4575)
6f5106d is described below
commit 6f5106da61754f094efeb87a80c2012541968bab
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Sat Dec 18 00:13:04 2021 +0800
[IOTDB-2162] Simplify the recovery merge process (#4575)
---
.../inplace/InplaceCompactionRecoverTask.java | 11 +-
.../task/CleanLastCrossSpaceCompactionTask.java | 67 +++++
.../cross/inplace/task/RecoverCrossMergeTask.java | 305 ---------------------
3 files changed, 73 insertions(+), 310 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionRecoverTask.java
index 2981044..d0303dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionRecoverTask.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.engine.compaction.cross.inplace;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.cross.AbstractCrossSpaceCompactionRecoverTask;
-import org.apache.iotdb.db.engine.compaction.cross.inplace.task.RecoverCrossMergeTask;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.task.CleanLastCrossSpaceCompactionTask;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -80,8 +80,8 @@ public class InplaceCompactionRecoverTask extends InplaceCompactionTask {
while (unSeqIterator.hasNext()) {
unSeqFileList.add(unSeqIterator.next());
}
- RecoverCrossMergeTask recoverCrossMergeTask =
- new RecoverCrossMergeTask(
+ CleanLastCrossSpaceCompactionTask cleanLastCrossSpaceCompactionTask =
+ new CleanLastCrossSpaceCompactionTask(
seqFileList,
unSeqFileList,
storageGroupDir,
@@ -89,8 +89,9 @@ public class InplaceCompactionRecoverTask extends InplaceCompactionTask {
taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
logicalStorageGroupName);
- LOGGER.info("{} a RecoverMergeTask {} starts...", fullStorageGroupName, taskName);
- recoverCrossMergeTask.recoverMerge(
+ LOGGER.info(
+ "{} a CleanLastCrossSpaceCompactionTask {} starts...", fullStorageGroupName, taskName);
+ cleanLastCrossSpaceCompactionTask.cleanLastCrossSpaceCompactionInfo(
IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot(), logFile);
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
for (TsFileResource seqFile : seqFileList) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/CleanLastCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/CleanLastCrossSpaceCompactionTask.java
new file mode 100644
index 0000000..a174840
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/CleanLastCrossSpaceCompactionTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.cross.inplace.task;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * RecoverCrossMergeTask is an extension of MergeTask, which resumes the last merge progress by
+ * scanning merge.log using LogAnalyzer and continue the unfinished merge.
+ */
+public class CleanLastCrossSpaceCompactionTask extends CrossSpaceMergeTask {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CleanLastCrossSpaceCompactionTask.class);
+
+ public CleanLastCrossSpaceCompactionTask(
+ List<TsFileResource> seqFiles,
+ List<TsFileResource> unseqFiles,
+ String storageGroupSysDir,
+ MergeCallback callback,
+ String taskName,
+ boolean fullMerge,
+ String storageGroupName) {
+ super(
+ seqFiles, unseqFiles, storageGroupSysDir, callback, taskName, fullMerge, storageGroupName);
+ }
+
+ public void cleanLastCrossSpaceCompactionInfo(boolean continueMerge, File logFile)
+ throws IOException {
+ if (!logFile.exists()) {
+ logger.info("{} no merge.log, cross space compaction clean ends.", taskName);
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ cleanUp(continueMerge);
+ if (logger.isInfoEnabled()) {
+ logger.info(
+ "{} cross space compaction clean ends after {}ms.",
+ taskName,
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/RecoverCrossMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/RecoverCrossMergeTask.java
deleted file mode 100644
index f15d298..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/task/RecoverCrossMergeTask.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.compaction.cross.inplace.task;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.compaction.cross.inplace.recover.InplaceCompactionLogAnalyzer;
-import org.apache.iotdb.db.engine.compaction.cross.inplace.recover.InplaceCompactionLogAnalyzer.Status;
-import org.apache.iotdb.db.engine.compaction.cross.inplace.recover.InplaceCompactionLogger;
-import org.apache.iotdb.db.engine.compaction.cross.inplace.selector.MaxSeriesMergeFileSelector;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.utils.MergeUtils;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-/**
- * RecoverCrossMergeTask is an extension of MergeTask, which resumes the last merge progress by
- * scanning merge.log using LogAnalyzer and continue the unfinished merge.
- */
-public class RecoverCrossMergeTask extends CrossSpaceMergeTask {
-
- private static final Logger logger = LoggerFactory.getLogger(RecoverCrossMergeTask.class);
-
- private InplaceCompactionLogAnalyzer analyzer;
-
- public RecoverCrossMergeTask(
- List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles,
- String storageGroupSysDir,
- MergeCallback callback,
- String taskName,
- boolean fullMerge,
- String storageGroupName) {
- super(
- seqFiles, unseqFiles, storageGroupSysDir, callback, taskName, fullMerge, storageGroupName);
- }
-
- public void recoverMerge(boolean continueMerge, File logFile)
- throws IOException, MetadataException {
- if (!logFile.exists()) {
- logger.info("{} no merge.log, merge recovery ends", taskName);
- return;
- }
- long startTime = System.currentTimeMillis();
-
- analyzer = new InplaceCompactionLogAnalyzer(resource, taskName, logFile, storageGroupName);
- Status status = analyzer.analyze();
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} merge recovery status determined: {} after {}ms",
- taskName,
- status,
- (System.currentTimeMillis() - startTime));
- }
- switch (status) {
- case NONE:
- logFile.delete();
- break;
- case MERGE_START:
- resumeAfterFilesLogged(continueMerge);
- break;
- case ALL_TS_MERGED:
- resumeAfterAllTsMerged(continueMerge);
- break;
- case MERGE_END:
- cleanUp(continueMerge);
- break;
- default:
- throw new UnsupportedOperationException(taskName + " found unrecognized status " + status);
- }
- if (logger.isInfoEnabled()) {
- logger.info(
- "{} merge recovery ends after {}ms", taskName, (System.currentTimeMillis() - startTime));
- }
- }
-
- private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
- if (continueMerge) {
- resumeMergeProgress();
- calculateConcurrentSeriesNum();
- if (concurrentMergeSeriesNum == 0) {
- throw new IOException(
- "Merge cannot be resumed under current memory budget, please "
- + "increase the budget or disable continueMergeAfterReboot");
- }
-
- MergeMultiChunkTask mergeChunkTask =
- new MergeMultiChunkTask(
- mergeContext,
- taskName,
- inplaceCompactionLogger,
- resource,
- fullMerge,
- analyzer.getUnmergedPaths(),
- concurrentMergeSeriesNum,
- storageGroupName);
- analyzer.setUnmergedPaths(null);
- mergeChunkTask.mergeSeries();
-
- MergeFileTask mergeFileTask =
- new MergeFileTask(
- taskName, mergeContext, inplaceCompactionLogger, resource, resource.getSeqFiles());
- mergeFileTask.mergeFiles();
- }
- cleanUp(continueMerge);
- }
-
- private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
- if (continueMerge) {
- resumeMergeProgress();
- MergeFileTask mergeFileTask =
- new MergeFileTask(
- taskName,
- mergeContext,
- inplaceCompactionLogger,
- resource,
- analyzer.getUnmergedFiles());
- analyzer.setUnmergedFiles(null);
- mergeFileTask.mergeFiles();
- } else {
- // NOTICE: although some of the seqFiles may have been truncated in last merge, we do not
- // recover them here because later TsFile recovery will recover them
- truncateFiles();
- }
- cleanUp(continueMerge);
- }
-
- private void resumeMergeProgress() throws IOException {
- inplaceCompactionLogger = new InplaceCompactionLogger(storageGroupSysDir);
- truncateFiles();
- recoverChunkCounts();
- }
-
- private void calculateConcurrentSeriesNum() throws IOException {
- long singleSeriesUnseqCost = 0;
- long maxUnseqCost = 0;
- for (TsFileResource unseqFile : resource.getUnseqFiles()) {
- long[] chunkNums =
- MergeUtils.findTotalAndLargestSeriesChunkNum(
- unseqFile, resource.getFileReader(unseqFile));
- long totalChunkNum = chunkNums[0];
- long maxChunkNum = chunkNums[1];
- singleSeriesUnseqCost += unseqFile.getTsFileSize() * maxChunkNum / totalChunkNum;
- maxUnseqCost += unseqFile.getTsFileSize();
- }
-
- long singleSeriesSeqReadCost = 0;
- long maxSeqReadCost = 0;
- long seqWriteCost = 0;
- for (TsFileResource seqFile : resource.getSeqFiles()) {
- long[] chunkNums =
- MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
- long totalChunkNum = chunkNums[0];
- long maxChunkNum = chunkNums[1];
- long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
- long newSingleSeriesSeqReadCost = fileMetaSize * maxChunkNum / totalChunkNum;
- singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, singleSeriesSeqReadCost);
- maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost);
- seqWriteCost += fileMetaSize;
- }
-
- long memBudget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
- int lb = 0;
- int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM;
- int mid = (lb + ub) / 2;
- while (mid != lb) {
- long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost);
- long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, maxSeqReadCost);
- long totalCost = unseqCost + seqReadCos + seqWriteCost;
- if (totalCost <= memBudget) {
- lb = mid;
- } else {
- ub = mid;
- }
- mid = (lb + ub) / 2;
- }
- concurrentMergeSeriesNum = lb;
- }
-
- // scan the metadata to compute how many chunks are merged/unmerged so at last we can decide to
- // move the merged chunks or the unmerged chunks
- private void recoverChunkCounts() throws IOException {
- logger.info("{} recovering chunk counts", taskName);
- int fileCnt = 1;
- for (TsFileResource tsFileResource : resource.getSeqFiles()) {
- logger.info(
- "{} recovering {} {}/{}",
- taskName,
- tsFileResource.getTsFile().getName(),
- fileCnt,
- resource.getSeqFiles().size());
- RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
- mergeFileWriter.makeMetadataVisible();
- mergeContext.getUnmergedChunkStartTimes().put(tsFileResource, new HashMap<>());
- List<PartialPath> pathsToRecover = analyzer.getMergedPaths();
- int cnt = 0;
- double progress = 0.0;
- for (PartialPath path : pathsToRecover) {
- recoverChunkCounts(path, tsFileResource, mergeFileWriter);
- if (logger.isInfoEnabled()) {
- cnt += 1.0;
- double newProgress = 100.0 * cnt / pathsToRecover.size();
- if (newProgress - progress >= 1.0) {
- progress = newProgress;
- logger.info(
- "{} {}% series count of {} are recovered",
- taskName, progress, tsFileResource.getTsFile().getName());
- }
- }
- }
- fileCnt++;
- }
- analyzer.setMergedPaths(null);
- }
-
- private void recoverChunkCounts(
- PartialPath path, TsFileResource tsFileResource, RestorableTsFileIOWriter mergeFileWriter)
- throws IOException {
- mergeContext.getUnmergedChunkStartTimes().get(tsFileResource).put(path, new ArrayList<>());
-
- List<ChunkMetadata> seqFileChunks = resource.queryChunkMetadata(path, tsFileResource);
- List<ChunkMetadata> mergeFileChunks =
- mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
- mergeContext
- .getMergedChunkCnt()
- .compute(
- tsFileResource,
- (k, v) -> v == null ? mergeFileChunks.size() : v + mergeFileChunks.size());
- int seqChunkIndex = 0;
- int mergeChunkIndex = 0;
- int unmergedCnt = 0;
- while (seqChunkIndex < seqFileChunks.size() && mergeChunkIndex < mergeFileChunks.size()) {
- ChunkMetadata seqChunk = seqFileChunks.get(seqChunkIndex);
- ChunkMetadata mergedChunk = mergeFileChunks.get(mergeChunkIndex);
- if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
- // this seqChunk is unmerged
- unmergedCnt++;
- seqChunkIndex++;
- mergeContext
- .getUnmergedChunkStartTimes()
- .get(tsFileResource)
- .get(path)
- .add(seqChunk.getStartTime());
- } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime()
- && seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
- // this seqChunk is merged
- seqChunkIndex++;
- } else {
- // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
- // seqChunk
- mergeChunkIndex++;
- }
- }
- int finalUnmergedCnt = unmergedCnt;
- mergeContext
- .getUnmergedChunkCnt()
- .compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt : v + finalUnmergedCnt);
- }
-
- private void truncateFiles() throws IOException {
- logger.info("{} truncating {} files", taskName, analyzer.getFileLastPositions().size());
- for (Entry<File, Long> entry : analyzer.getFileLastPositions().entrySet()) {
- File file = entry.getKey();
- Long lastPosition = entry.getValue();
- if (file.exists() && file.length() != lastPosition) {
- try (FileInputStream fileInputStream = new FileInputStream(file)) {
- FileChannel channel = fileInputStream.getChannel();
- channel.truncate(lastPosition);
- channel.close();
- }
- }
- }
- analyzer.setFileLastPositions(null);
- }
-}