You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/21 02:57:01 UTC
[incubator-iotdb] 01/01: recover for restart
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 34651d01766fa7313b1071ae79d10f9579ffa3a9
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jul 21 10:56:49 2020 +0800
recover for restart
---
.../writelog/recover/TsFileRecoverPerformer.java | 142 ++++++++++-----------
1 file changed, 64 insertions(+), 78 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 1a146d6..b4b2cdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.writelog.recover;
import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
-import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
import java.io.File;
@@ -33,6 +32,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -63,14 +63,14 @@ public class TsFileRecoverPerformer {
private String filePath;
private String logNodePrefix;
private VersionController versionController;
- private TsFileResource resource;
+ private TsFileResource tsFileResource;
private boolean sequence;
private boolean isLastFile;
private List<List<TsFileResource>> vmTsFileResources;
/**
- * @param isLastFile whether this TsFile is the last file of its partition
+ * @param isLastFile whether this TsFile is the last file of its partition
* @param vmTsFileResources only last file could have non-empty vmTsFileResources
*/
public TsFileRecoverPerformer(String logNodePrefix, VersionController versionController,
@@ -79,7 +79,7 @@ public class TsFileRecoverPerformer {
this.filePath = currentTsFileResource.getTsFilePath();
this.logNodePrefix = logNodePrefix;
this.versionController = versionController;
- this.resource = currentTsFileResource;
+ this.tsFileResource = currentTsFileResource;
this.sequence = sequence;
this.isLastFile = isLastFile;
this.vmTsFileResources = vmTsFileResources;
@@ -109,16 +109,11 @@ public class TsFileRecoverPerformer {
logger.error("TsFile {} is missing, will skip its recovery.", filePath);
return null;
}
- for (List<File> subVmFileList : vmFileList) {
- for (File vmFile : subVmFileList) {
- if (!vmFile.exists()) {
- logger.error("VMFile {} is missing, will skip its recovery.", vmFile.getPath());
- return null;
- }
- }
- }
+
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
+
+ // create RestorableTsFileIOWriter for all vmfiles in each level
List<List<RestorableTsFileIOWriter>> vmRestorableTsFileIOWriterList = new ArrayList<>();
try {
restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
@@ -126,7 +121,10 @@ public class TsFileRecoverPerformer {
vmRestorableTsFileIOWriterList.add(new ArrayList<>());
for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
file = vmFileList.get(i).get(j);
- vmRestorableTsFileIOWriterList.get(i).add(new RestorableTsFileIOWriter(file));
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(file);
+ // recover vmfiles' metadata
+ recoverResourceFromWriter(writer, vmTsFileResources.get(i).get(j));
+ vmRestorableTsFileIOWriterList.get(i).add(writer);
}
}
} catch (NotCompatibleTsFileException e) {
@@ -137,21 +135,11 @@ public class TsFileRecoverPerformer {
throw new StorageGroupProcessorException(e);
}
- RestorableTsFileIOWriter lastRestorableTsFileIOWriter =
- vmTsFileResources.isEmpty() ? restorableTsFileIOWriter
- : vmRestorableTsFileIOWriterList.get(0)
- .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
-
- TsFileResource lastTsFileResource = vmTsFileResources.isEmpty() ? resource
- : vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
-
- boolean isComplete =
- !lastRestorableTsFileIOWriter.hasCrashed() && !lastRestorableTsFileIOWriter.canWrite();
-
- if (isComplete) {
- // tsfile is complete, vmfile is never complete because it's canWrite() always return true.
+ // judge whether tsfile is complete
+ if (!restorableTsFileIOWriter.hasCrashed()) {
+ // if tsfile is complete, then there is no vmfiles.
try {
- recoverResource(resource);
+ recoverResource(tsFileResource);
return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
} catch (IOException e) {
throw new StorageGroupProcessorException(
@@ -159,61 +147,59 @@ public class TsFileRecoverPerformer {
+ RESOURCE_SUFFIX + e);
}
} else {
- if (!vmTsFileResources.isEmpty()) {
- for (int i = 0; i < vmTsFileResources.size(); i++) {
- for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
- recoverResourceFromWriter(vmRestorableTsFileIOWriterList.get(i).get(j),
- vmTsFileResources.get(i).get(j));
- }
- }
- recoverResourceFromWriter(restorableTsFileIOWriter, resource);
- boolean vmFileNotCrashed = !getFlushLogFile(restorableTsFileIOWriter).exists();
- // if the last file in vmTsFileResources is not crashed
- if (vmFileNotCrashed) {
- try {
- boolean tsFileNotCrashed = !isVMLoggerFileExist(restorableTsFileIOWriter);
- // tsfile is not crash
- if (tsFileNotCrashed) {
+ // tsfile has crashed
+ // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+ // map must be updated first to avoid duplicated insertion
+ recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+ }
+
+ // If the vm is not enable, the walTargetWriter points to the tsfile.
+ // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
+ // if the vm is enable and flush log does not exist, the walTargetWriter is null.
+ RestorableTsFileIOWriter walTargetWriter = null;
+ TsFileResource walTargetResource = null;
- // if wal exists, we should open a new vmfile to replay it
- File newVmFile = resource.getProcessor().createNewVMFile(resource, 0);
- TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
- RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
- if (redoLogs(newVMWriter, newVmTsFileResource)) {
- vmTsFileResources.get(0).add(newVmTsFileResource);
- vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
- } else {
- Files.delete(newVmFile.toPath());
- }
- } else {
- IMemTable recoverMemTable = new PrimitiveMemTable();
- recoverMemTable.setVersion(versionController.nextVersion());
- LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath,
- resource.getModFile(),
- versionController, resource, recoverMemTable, sequence);
- logReplayer.replayLogs();
- }
- // clean logs
- MultiFileLogNodeManager.getInstance().deleteNode(
- logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
- updateTsFileResource();
- return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
- } catch (IOException e) {
- throw new StorageGroupProcessorException(
- "recover the resource file failed: " + filePath
- + RESOURCE_SUFFIX + e);
- }
+ boolean enableVM = IoTDBDescriptor.getInstance().getConfig().isEnableVm();
+ boolean flushLogExist = getFlushLogFile(restorableTsFileIOWriter).exists();
+
+ if (!enableVM) {
+ walTargetWriter = restorableTsFileIOWriter;
+ walTargetResource = tsFileResource;
+ } else if (flushLogExist) {
+ walTargetWriter = vmRestorableTsFileIOWriterList.get(0)
+ .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
+ walTargetResource = vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
+ }
+
+
+ if (walTargetWriter == null) {
+ try {
+ // if wal exists, we should open a new vmfile to replay it
+ File newVmFile = tsFileResource.getProcessor().createNewVMFile(tsFileResource, 0);
+ TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
+ RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
+ if (redoLogs(newVMWriter, newVmTsFileResource)) {
+ // recover metadata for new vmfile
+ recoverResourceFromWriter(newVMWriter, newVmTsFileResource);
+ vmTsFileResources.get(0).add(newVmTsFileResource);
+ vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
+ // clean logs
+ MultiFileLogNodeManager.getInstance().deleteNode(
+ logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+ updateTsFileResource();
+ } else {
+ Files.delete(newVmFile.toPath());
}
- } else {
- // tsfile has crashed
- // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
- // map must be updated first to avoid duplicated insertion
- recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
+ return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
+ } catch (IOException e) {
+ throw new StorageGroupProcessorException(
+ "recover the resource file failed: " + filePath
+ + RESOURCE_SUFFIX + e);
}
}
// redo logs
- redoLogs(lastRestorableTsFileIOWriter, lastTsFileResource);
+ redoLogs(walTargetWriter, walTargetResource);
// clean logs
try {
@@ -232,8 +218,8 @@ public class TsFileRecoverPerformer {
for (Entry<String, Integer> entry : tsFileResource.getDeviceToIndexMap().entrySet()) {
String device = entry.getKey();
int index = entry.getValue();
- resource.updateStartTime(device, tsFileResource.getStartTime(index));
- resource.updateEndTime(device, tsFileResource.getEndTime(index));
+ this.tsFileResource.updateStartTime(device, tsFileResource.getStartTime(index));
+ this.tsFileResource.updateEndTime(device, tsFileResource.getEndTime(index));
}
}
}