You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/05/24 06:11:04 UTC
[incubator-iotdb] branch refactor_overflow updated: update merge
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_overflow by this push:
new ffedb93 update merge
ffedb93 is described below
commit ffedb9368ed1bc39f3140e42d0fad5879d088ac2
Author: 江天 <jt...@163.com>
AuthorDate: Fri May 24 14:09:15 2019 +0800
update merge
---
.../db/engine/sgmanager/StorageGroupProcessor.java | 154 +++++++++++++++++++--
.../db/engine/tsfiledata/TsFileProcessor.java | 46 +++++-
2 files changed, 185 insertions(+), 15 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 77fc48d..c658b8e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -21,8 +21,13 @@ package org.apache.iotdb.db.engine.sgmanager;
import static java.time.ZonedDateTime.ofInstant;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
+import java.nio.file.Files;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -44,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.Processor;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.merge.MergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
@@ -92,7 +98,12 @@ import org.slf4j.LoggerFactory;
public class StorageGroupProcessor extends Processor implements IStatistic {
private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupProcessor.class);
+
+ public static final String MERGE_TEMP_SUFFIX = ".merging";
+ private static final String OLD_FILE_RECORD = "MERGE_OLD_FILES";
+
private final String statStorageGroupName;
+ private String fileNodeDir;
private TsFileProcessor tsFileProcessor;
private OverflowProcessor overflowProcessor;
@@ -148,6 +159,13 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
LOGGER.info("The directory of the filenode processor {} doesn't exist. Create new directory {}",
getProcessorName(), systemFolder.getAbsolutePath());
}
+ fileNodeDir = systemFolder.getAbsolutePath();
+ try {
+ cleanLastMerge();
+ } catch (IOException e) {
+ throw new TsFileProcessorException(e);
+ }
+
//the version controller is shared by tsfile and overflow processor.
try {
versionController = new SimpleFileVersionController(systemFolder.getAbsolutePath());
@@ -582,10 +600,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
lastUnseqIndex = currUnseqGroup.size();
}
- if (currUnseqGroup.size() > 0) {
- seqGroups.add(currSeqGroup);
- unseqGroups.add(currUnseqGroup);
- }
+ seqGroups.add(currSeqGroup);
+ unseqGroups.add(currUnseqGroup);
}
return new Pair<>(seqGroups, unseqGroups);
}
@@ -626,6 +642,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
public void merge() throws StorageGroupProcessorException {
writeLock();
Pair<List<List<TsFileResource>>, List<List<TsFileResource>>> fileGroups;
+ List<TsFileResource> unseqFiles;
+ List<TsFileResource> seqFiles;
try {
// close seqFile and overflow(unseqFile), prepare for merge
LOGGER.info("The StorageGroupProcessor {} begins to merge.", processorName);
@@ -634,8 +652,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
isMerging = true;
// find what files are to be processed in this merge
- List<TsFileResource> seqFiles = tsFileProcessor.getTsFileResources();
- List<TsFileResource> unseqFiles = overflowProcessor.getTsFileResources();
+ seqFiles = new ArrayList<>(tsFileProcessor.getTsFileResources());
+ unseqFiles = new ArrayList<>(overflowProcessor.getTsFileResources());
fileGroups = groupMergeFiles(seqFiles, unseqFiles);
} catch (TsFileProcessorException e) {
@@ -650,6 +668,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
LOGGER.info("{}: There are {} groups to be merged", processorName, seqGroups.size());
+
for (int i = 0; i < seqGroups.size(); i++) {
List<TsFileResource> seqGroup = seqGroups.get(i);
List<TsFileResource> unseqGroup = unseqGroups.get(i);
@@ -661,21 +680,97 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
LOGGER.info("{}: Starting merging the {} group, there are {} seq groups and {} unseq groups",
processorName, i, seqFileSize, unseqFileSize);
TsFileResource newResource = mergeFileGroup(seqGroup, unseqGroup);
+ try {
+ addMergedFile(newResource, seqGroup, unseqGroup);
+ } catch (IOException e) {
+ LOGGER.error("Cannot merge the {} group because:", e);
+ continue;
+ }
long endTime = System.currentTimeMillis();
+ LOGGER.info("{}: End merging the {} group after {}ms", processorName, i, endTime - startTime);
}
+ }
+ private void waitForQueriesOnOldFile() {
+ long sleepTime = 1000;
+ long maxSleepTime = 60 * 1000L;
+ while (!oldMultiPassTokenSet.isEmpty()) {
+ try {
+ Thread.sleep(sleepTime);
+ sleepTime = sleepTime * 2 > maxSleepTime ? maxSleepTime : sleepTime * 2;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("{}: unexpected interuption when waiting for queries on old files",
+ processorName, e);
+ }
+ }
+ }
- // change status from merge to wait
- switchMergeToWaiting(backupIntervalFiles, needEmtpy);
+ private void addMergedFile(TsFileResource mergedFile, List<TsFileResource> seqGroup,
+ List<TsFileResource> unseqGroup) throws IOException {
+ writeLock();
+ try {
+ // record merge old files so that they can be deleted after system crush
+ recordOldFiles(mergedFile, seqGroup, unseqGroup);
+ // update file lists of seqProcessor and unseqProcessor
+ tsFileProcessor.replaceFiles(seqGroup, mergedFile);
+ overflowProcessor.replaceFiles(unseqGroup, null);
+ // rename the new file so that it will not be deleted after system crush
+ String newName = mergedFile.getFile().getName().replace(MERGE_TEMP_SUFFIX, "");
+ File newFile = new File(mergedFile.getFile().getParent(), newName);
+ //noinspection ResultOfMethodCallIgnored
+ newFile.delete();
+ if (!mergedFile.getFile().renameTo(newFile)) {
+ throw new IOException(String.format("Cannot rename merged file %s",
+ mergedFile.getFilePath()));
+ }
+ // now new queries will no longer use old files, so record new queries in another token set
+ oldMultiPassTokenSet = newMultiPassTokenSet;
+ newMultiPassTokenSet = new HashSet<>();
+ } finally {
+ writeUnlock();
+ }
+ // wait until all old queries end
+ waitForQueriesOnOldFile();
+ // remove old files
+ for (TsFileResource oldFile : seqGroup) {
+ Files.deleteIfExists(oldFile.getFile().toPath());
+ }
+ for (TsFileResource oldFile : unseqGroup) {
+ Files.deleteIfExists(oldFile.getFile().toPath());
+ }
+ // remove old file records
+ Files.deleteIfExists(getMergeFileRecord().toPath());
+ }
+
+ private File getMergeFileRecord() {
+ return new File(fileNodeDir, OLD_FILE_RECORD);
+ }
- // change status from wait to work
- switchWaitingToWorking();
+ private void recordOldFiles(TsFileResource newFile,
+ List<TsFileResource> seqGroup, List<TsFileResource> unseqGroup)
+ throws IOException {
+ File oldFileRecord = getMergeFileRecord();
+ try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(oldFileRecord, true))) {
+ for (TsFileResource resource : seqGroup) {
+ bufferedWriter.write(resource.getFilePath());
+ bufferedWriter.write("\n");
+ }
+ for (TsFileResource resource : unseqGroup) {
+ bufferedWriter.write(resource.getFilePath());
+ bufferedWriter.write("\n");
+ }
+ bufferedWriter.write(newFile.getFilePath());
+ }
}
private TsFileResource mergeFileGroup(List<TsFileResource> seqGroup, List<TsFileResource> unseqGroup)
throws StorageGroupProcessorException {
+ if (unseqGroup.isEmpty()) {
+ return seqGroup.get(0);
+ }
// find all timeseries (possibly) of each device (the key) contained in these files.
- Map<String, List<Path>> pathMap = null;
+ Map<String, List<Path>> pathMap;
try {
pathMap = resolveMergePaths(seqGroup, unseqGroup);
} catch (PathErrorException e) {
@@ -686,7 +781,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
mergeBaseDir = Directories.getInstance().getNextFolderForTsfile();
mergeFileName = minimumTime
- + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis();
+ + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis()
+ + MERGE_TEMP_SUFFIX;
mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
mergeFileName);
mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
@@ -923,4 +1019,38 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
}
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void cleanLastMerge() throws IOException {
+ // if system crushes during last merge, there may be undeleted old files
+ // these files will be deleted in this method
+ File mergeRecord = getMergeFileRecord();
+ List<String> filePaths = new ArrayList<>();
+ try (BufferedReader bufferedReader = new BufferedReader(new FileReader(mergeRecord))) {
+ bufferedReader.lines().forEach(line -> filePaths.add(line));
+ }
+ // the last file should be the merged new file
+ String lastFilePath = filePaths.get(filePaths.size() - 1);
+ if (!lastFilePath.endsWith(MERGE_TEMP_SUFFIX)) {
+ // the record is incomplete, consider last merge failed, do not delete any file
+ LOGGER.warn("The merge record is not complete, the last merge seems failed");
+ mergeRecord.delete();
+ return;
+ }
+ File mergedTempFile = new File(lastFilePath);
+ if (mergedTempFile.exists()) {
+ // the temp file has not been renamed, rename it
+ File newName = new File(mergedTempFile.getParent(),
+ mergedTempFile.getName().replace(MERGE_TEMP_SUFFIX, ""));
+ if (!mergedTempFile.renameTo(newName)) {
+ LOGGER.error("Cannot rename merged temp file {}, abort last merge",
+ mergedTempFile.getAbsolutePath());
+ mergeRecord.delete();
+ return;
+ }
+ }
+ // the merged temp file has been renamed, the old files can be safely deleted
+ for (String filePath : filePaths) {
+ new File(filePath).delete();
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index a06cd76..d822edb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -58,6 +58,7 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.querycontext.SeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
+import org.apache.iotdb.db.engine.sgmanager.StorageGroupProcessor;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -189,7 +190,6 @@ public class TsFileProcessor extends Processor {
@SuppressWarnings({"ResultOfMethodCallIgnored"})
private void initResources() throws TsFileProcessorException {
tsFileResources = new ArrayList<>();
- inverseIndexOfResource = new HashMap<>();
lastFlushedTimeForEachDevice = new HashMap<>();
minWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
maxWrittenTimeForEachDeviceInCurrentFile = new HashMap<>();
@@ -227,7 +227,7 @@ public class TsFileProcessor extends Processor {
if (unclosedFile == null) {
unclosedFile = generateNewTsFilePath();
}
-
+ buildInverseIndex();
initCurrentTsFile(unclosedFile);
}
@@ -244,6 +244,12 @@ public class TsFileProcessor extends Processor {
.parseLong(x.getName().split(FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR)[0])));
for (File tsfile : tsFiles) {
+ if (tsfile.getName().endsWith(StorageGroupProcessor.MERGE_TEMP_SUFFIX)) {
+ // remove temp file of last failed merge
+ //noinspection ResultOfMethodCallIgnored
+ tsfile.delete();
+ continue;
+ }
if (!tsfile.getName().equals(unclosedFileName)) {
addResource(tsfile);
}
@@ -267,12 +273,20 @@ public class TsFileProcessor extends Processor {
tsFileResources.add(resource);
//maintain the inverse index and fileNamePrefix
for (String device : resource.getDevices()) {
- inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
lastFlushedTimeForEachDevice
.merge(device, resource.getEndTime(device), (x, y) -> x > y ? x : y);
}
}
+ public void buildInverseIndex() {
+ inverseIndexOfResource.clear();
+ for (TsFileResource resource : tsFileResources) {
+ for (String device : resource.getDevices()) {
+ inverseIndexOfResource.computeIfAbsent(device, k -> new ArrayList<>()).add(resource);
+ }
+ }
+ }
+
private File generateNewTsFilePath() throws TsFileProcessorException {
String dataDir = getNextDataFolder();
@@ -950,4 +964,30 @@ public class TsFileProcessor extends Processor {
public List<TsFileResource> getTsFileResources() {
return tsFileResources;
}
+
+ /**
+ * Merge method, replaces 'oldFiles' in tsfileResources with 'newFile'. If 'newFile' is null, just
+ * remove oldFiles.
+ * @param oldFiles
+ * @param newFile
+ */
+ public void replaceFiles(List<TsFileResource> oldFiles, TsFileResource newFile) {
+ List<TsFileResource> newFiles = new ArrayList<>();
+ int j = 0;
+ for (TsFileResource origin : tsFileResources) {
+ TsFileResource toDelete = j < oldFiles.size() ? oldFiles.get(j) : null;
+ if (origin == toDelete) {
+ if (j == 0 && newFile != null) {
+ // replace the first old file with the new file
+ newFiles.add(newFile);
+ }
+ // this file should be deleted, do not add it to new files
+ j++;
+ } else {
+ // this file is not one of the new files, keep it
+ newFiles.add(origin);
+ }
+ }
+ tsFileResources = newFiles;
+ }
}
\ No newline at end of file