You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 13:39:53 UTC
[incubator-iotdb] branch dev_merge updated: add merge recovery in
system reboot
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new 23b6ad8 add merge recovery in system reboot
23b6ad8 is described below
commit 23b6ad899e7cfa3489b4bb476987a9b5b8dbe81c
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 21:37:35 2019 +0800
add merge recovery in system reboot
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++
.../iotdb/db/engine/merge/MergeFileSelector.java | 5 +-
.../apache/iotdb/db/engine/merge/MergeTask.java | 50 +++++++-
.../iotdb/db/engine/merge/RecoverMergeTask.java | 13 +-
.../engine/storagegroup/StorageGroupProcessor.java | 134 +++++++++++++++------
.../db/engine/storagegroup/TsFileResource.java | 11 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 1 +
7 files changed, 181 insertions(+), 43 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index badd42e..680c1e3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -184,6 +184,8 @@ public class IoTDBConfig {
private int mergeThreadNum = 2;
+ private boolean mergeAfterReboot = true;
+
public IoTDBConfig() {
// empty constructor
}
@@ -507,4 +509,12 @@ public class IoTDBConfig {
public void setMergeThreadNum(int mergeThreadNum) {
this.mergeThreadNum = mergeThreadNum;
}
+
+ public boolean isMergeAfterReboot() {
+ return mergeAfterReboot;
+ }
+
+ public void setMergeAfterReboot(boolean mergeAfterReboot) {
+ this.mergeAfterReboot = mergeAfterReboot;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
index 95d6131..c94eea9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeFileSelector.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.MetadataErrorException;
@@ -62,8 +63,8 @@ public class MergeFileSelector {
public MergeFileSelector(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, long memoryBudget) {
- this.seqFiles = seqFiles;
- this.unseqFiles = unseqFiles;
+ this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+ this.unseqFiles = unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
this.memoryBudget = memoryBudget;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 1c5366e..39308a9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -26,6 +26,7 @@ import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -101,17 +102,29 @@ public class MergeTask implements Callable<Void> {
protected MergeCallback callback;
+ protected String taskName;
+
public MergeTask(List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback) throws IOException {
+ List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback,
+ String taskName) throws IOException {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.storageGroupDir = storageGroupDir;
this.callback = callback;
+ this.taskName = taskName;
}
@Override
public Void call() throws Exception {
- doMerge();
+ try {
+ doMerge();
+ } catch (Exception e) {
+ logger.error("Runtime exception in merge {}", taskName, e);
+ seqFiles = Collections.emptyList();
+ unseqFiles = Collections.emptyList();
+ cleanUp(true);
+ throw e;
+ }
return null;
}
@@ -131,17 +144,34 @@ public class MergeTask implements Callable<Void> {
protected void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
// decide whether to write the unmerged chunks to the merge files or to move the merged data
// back to the origin seqFile's
+ if (logger.isInfoEnabled()) {
+ logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
+ }
+ int cnt = 0;
for (TsFileResource seqFile : unmergedFiles) {
int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
if (mergedChunkNum >= unmergedChunkNum) {
// move the unmerged data to the new file
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} moving unmerged data of {} to the merged file", taskName,
+ seqFile.getFile().getName());
+ }
moveUnmergedToNew(seqFile);
} else {
// move the merged data to the old file
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} moving merged data of {} to the old file", taskName,
+ seqFile.getFile().getName());
+ }
moveMergedToOld(seqFile);
}
+ cnt ++;
+ if (logger.isInfoEnabled()) {
+ logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size());
+ }
}
+ logger.info("{} has merged all files", taskName);
mergeLogger.logMergeEnd();
}
@@ -152,20 +182,35 @@ public class MergeTask implements Callable<Void> {
}
protected void mergeSeries(List<Path> unmergedSeries) throws IOException {
+ if (logger.isInfoEnabled()) {
+ logger.info("{} starts to merge {} series", taskName, unmergedSeries.size());
+ }
for (TsFileResource seqFile : seqFiles) {
unmergedChunkStartTimes.put(seqFile, new HashMap<>());
}
// merge each series and write data into each seqFile's temp merge file
+ int mergedCnt = 0;
+ double progress = 0.0;
for (Path path : unmergedSeries) {
mergeLogger.logTSStart(path);
mergeOnePath(path);
mergeLogger.logTSEnd(path);
+ mergedCnt ++;
+ if (logger.isDebugEnabled()) {
+ double newProgress = 100 * mergedCnt / (double) (unmergedSeries.size());
+ if (newProgress - progress >= 0.01) {
+ progress = newProgress;
+ logger.debug("{} has merged {}% series", taskName, progress);
+ }
+ }
}
+ logger.info("{} all series are merged", taskName);
mergeLogger.logAllTsEnd();
}
protected void cleanUp(boolean executeCallback) throws IOException {
+ logger.info("{} is cleaning up", taskName);
for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
sequenceReader.close();
}
@@ -289,6 +334,7 @@ public class MergeTask implements Callable<Void> {
seqFile.getMergeQueryLock().writeLock().lock();
try {
+ seqFile.getFile().delete();
FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index 740e854..a4b9c09 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -36,9 +36,13 @@ import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RecoverMergeTask extends MergeTask {
+ private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+
private String currLine;
private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
@@ -53,19 +57,21 @@ public class RecoverMergeTask extends MergeTask {
public RecoverMergeTask(
List<TsFileResource> allSeqFiles,
List<TsFileResource> allUnseqFiles,
- String storageGroupDir, MergeCallback callback) throws IOException {
- super(allSeqFiles, allUnseqFiles, storageGroupDir, callback);
+ String storageGroupDir, MergeCallback callback, String taskName) throws IOException {
+ super(allSeqFiles, allUnseqFiles, storageGroupDir, callback, taskName);
}
public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
if (!logFile.exists()) {
+ logger.info("{} no merge.log, merge recovery ends", taskName);
return;
}
mergeSeqFiles = new ArrayList<>();
mergeUnseqFiles = new ArrayList<>();
Status status = determineStatus(logFile);
+ logger.info("{} merge recovery status determined: {}", taskName, status);
switch (status) {
case NONE:
logFile.delete();
@@ -98,12 +104,14 @@ public class RecoverMergeTask extends MergeTask {
cleanUp(continueMerge);
break;
}
+ logger.info("{} merge recovery ends", taskName);
}
// scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
// move the merged chunks or the unmerged chunks
private void recoverChunkCounts() throws IOException {
+ logger.debug("{} recovering chunk counts", taskName);
for (TsFileResource tsFileResource : seqFiles) {
RestorableTsFileIOWriter mergeFileWriter = getMergeFileWriter(tsFileResource);
mergeFileWriter.makeMetadataVisible();
@@ -150,6 +158,7 @@ public class RecoverMergeTask extends MergeTask {
}
private void truncateFiles() throws IOException {
+ logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
File file = entry.getKey();
Long lastPosition = entry.getValue();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e3876c3..f805feb 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.engine.merge.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.SystemConstant.TSFILE_SUFFIX;
import java.io.File;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.merge.MergeFileSelector;
import org.apache.iotdb.db.engine.merge.MergeTask;
import org.apache.iotdb.db.engine.merge.MergeManager;
+import org.apache.iotdb.db.engine.merge.RecoverMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -48,6 +51,7 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -184,13 +188,23 @@ public class StorageGroupProcessor {
private void recover() throws ProcessorException {
logger.info("recover Storage Group {}", storageGroupName);
- // collect TsFiles from sequential data directory
- List<File> tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
- recoverSeqFiles(tsFiles);
+ // collect TsFiles from sequential and unsequential data directory
+ List<TsFileResource> seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+ List<TsFileResource> unseqTsFiles =
+ getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
- // collect TsFiles from unsequential data directory
- tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
- recoverUnseqFiles(tsFiles);
+ String taskName = storageGroupName + System.currentTimeMillis();
+ try {
+ RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
+ logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
+ recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isMergeAfterReboot());
+ } catch (IOException | MetadataErrorException e) {
+ throw new ProcessorException(e);
+ }
+
+ recoverSeqFiles(seqTsFiles);
+ recoverUnseqFiles(unseqTsFiles);
for (TsFileResource resource : sequenceFileList) {
latestTimeForEachDevice.putAll(resource.getEndTimeMap());
@@ -198,23 +212,47 @@ public class StorageGroupProcessor {
}
}
- private List<File> getAllFiles(List<String> folders) {
+ private List<TsFileResource> getAllFiles(List<String> folders) {
List<File> tsFiles = new ArrayList<>();
for (String baseDir : folders) {
File fileFolder = new File(baseDir, storageGroupName);
if (!fileFolder.exists()) {
continue;
}
+ // some TsFileResource may be persisting when the system crashed, try recovering such
+ // resources
+ continueFailedRenames(fileFolder, TEMP_SUFFIX);
+
+ // some TsFiles were going to be replaced by the merged files when the system crashed and
+ // the process was interrupted before the merged files could be named
+ continueFailedRenames(fileFolder, MERGE_SUFFIX);
+
Collections
.addAll(tsFiles, fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX)));
}
- return tsFiles;
+ tsFiles.sort(this::compareFileName);
+ List<TsFileResource> ret = new ArrayList<>();
+ tsFiles.forEach(f -> ret.add(new TsFileResource(f)));
+ return ret;
}
- private void recoverSeqFiles(List<File> tsFiles) throws ProcessorException {
- tsFiles.sort(this::compareFileName);
- for (File tsFile : tsFiles) {
- TsFileResource tsFileResource = new TsFileResource(tsFile);
+ private void continueFailedRenames(File fileFolder, String suffix) {
+ File[] files = fileFolder.listFiles(file -> file.getName().endsWith(suffix));
+ if (files != null) {
+ for (File tempResource : files) {
+ File originResource = new File(tempResource.getPath().replace(suffix, ""));
+ if (originResource.exists()) {
+ tempResource.delete();
+ } else {
+ tempResource.renameTo(originResource);
+ }
+ }
+ }
+ }
+
+ private void recoverSeqFiles(List<TsFileResource> tsFiles) throws ProcessorException {
+
+ for (TsFileResource tsFileResource : tsFiles) {
sequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
, fileSchema, versionController, tsFileResource, false);
@@ -222,10 +260,8 @@ public class StorageGroupProcessor {
}
}
- private void recoverUnseqFiles(List<File> tsFiles) throws ProcessorException {
- tsFiles.sort(this::compareFileName);
- for (File tsFile : tsFiles) {
- TsFileResource tsFileResource = new TsFileResource(tsFile);
+ private void recoverUnseqFiles(List<TsFileResource> tsFiles) throws ProcessorException {
+ for (TsFileResource tsFileResource : tsFiles) {
unSequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
fileSchema,
@@ -660,6 +696,7 @@ public class StorageGroupProcessor {
}
if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) {
logger.info("{} no files to be merged", storageGroupName);
+ return;
}
long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
@@ -672,12 +709,15 @@ public class StorageGroupProcessor {
budget);
return;
}
+ String taskName = storageGroupName + "-" + System.currentTimeMillis();
MergeTask mergeTask = new MergeTask(mergeFiles[0], mergeFiles[1],
- storageGroupSysDir.getPath(), this::mergeEndAction);
+ storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
+ mergingModification = new ModificationFile(storageGroupSysDir + File.separator + "merge"
+ + ".mods");
MergeManager.getINSTANCE().submit(mergeTask);
if (logger.isInfoEnabled()) {
- logger.info("{} submits a merge task, merging {} seqFiles, {} unseqFiles",
- storageGroupName, mergeFiles[0].size(), mergeFiles[1].size());
+ logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
+ storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size());
}
isMerging = true;
mergeStartTime = System.currentTimeMillis();
@@ -693,6 +733,15 @@ public class StorageGroupProcessor {
}
private void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+ logger.info("{} a merge task is ending...", storageGroupName);
+
+ if (seqFiles.isEmpty()) {
+ // merge runtime exception arose, just end this merge
+ isMerging = false;
+ logger.info("{} a merge task abnormally ends", storageGroupName);
+ return;
+ }
+
mergeLock.writeLock().lock();
try {
unSequenceFileList.removeAll(unseqFiles);
@@ -700,32 +749,47 @@ public class StorageGroupProcessor {
mergeLock.writeLock().unlock();
}
+ for (int i = 0; i < unseqFiles.size(); i++) {
+ TsFileResource unseqFile = unseqFiles.get(i);
+ unseqFile.getMergeQueryLock().writeLock().lock();
+ try {
+ unseqFile.remove();
+ } finally {
+ unseqFile.getMergeQueryLock().writeLock().unlock();
+ }
+ }
+
for (int i = 0; i < seqFiles.size(); i++) {
TsFileResource seqFile = seqFiles.get(i);
seqFile.getMergeQueryLock().writeLock().lock();
mergeLock.writeLock().lock();
try {
- // remove old modifications and write modifications generated during merge
- seqFile.removeModFile();
- for (Modification modification : mergingModification.getModifications()) {
- seqFile.getModFile().write(modification);
- }
- } catch (IOException e) {
- logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
- seqFile.getFile(), e);
- }
- if (i == seqFiles.size() - 1) {
+ logger.debug("{} is updating the {} merged file's modification file", storageGroupName, i);
try {
- mergingModification.remove();
+ // remove old modifications and write modifications generated during merge
+ seqFile.removeModFile();
+ for (Modification modification : mergingModification.getModifications()) {
+ seqFile.getModFile().write(modification);
+ }
} catch (IOException e) {
- logger.error("{} cannot remove merging modification ", storageGroupName, e);
+ logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+ seqFile.getFile(), e);
}
- mergingModification = null;
- isMerging = false;
+ if (i == seqFiles.size() - 1) {
+ try {
+ mergingModification.remove();
+ } catch (IOException e) {
+ logger.error("{} cannot remove merging modification ", storageGroupName, e);
+ }
+ mergingModification = null;
+ isMerging = false;
+ }
+ } finally {
+ seqFile.getMergeQueryLock().writeLock().unlock();
+ mergeLock.writeLock().unlock();
}
- seqFile.getMergeQueryLock().writeLock().unlock();
- mergeLock.writeLock().unlock();
}
+ logger.info("{} a merge task ends", storageGroupName);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b00e422..f2484c1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -123,8 +123,10 @@ public class TsFileResource {
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
- FileUtils.moveFile(new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX),
- new File(file + RESOURCE_SUFFIX));
+ File src = new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
+ File dest = new File(file + RESOURCE_SUFFIX);
+ dest.delete();
+ FileUtils.moveFile(src, dest);
}
public void deSerialize() throws IOException {
@@ -240,4 +242,9 @@ public class TsFileResource {
getModFile().remove();
modFile = null;
}
+
+ public void remove() {
+ file.delete();
+ new File(file.getPath() + RESOURCE_SUFFIX).delete();
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f77c353..1b6befa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -400,6 +400,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return true;
case "merge":
StorageEngine.getInstance().mergeAll();
+ return true;
default:
return false;
}