You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/20 02:56:54 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated:
refactor recovery of SeqTsFiles
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 2cfbb3f refactor recovery of SeqTsFiles
2cfbb3f is described below
commit 2cfbb3fc2e961aaf362aa835b698fe2dd9dd06ca
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jun 20 10:54:50 2019 +0800
refactor recovery of SeqTsFiles
---
.../db/engine/bufferwrite/BufferWriteProcessor.java | 16 +++++++---------
.../iotdb/db/engine/filenode/FileNodeProcessor.java | 17 +++++++++--------
.../db/writelog/recover/TsFileRecoverPerformer.java | 21 ++++++++++++++-------
3 files changed, 30 insertions(+), 24 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index c9129ef..d16dc32 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -144,15 +144,6 @@ public class BufferWriteProcessor extends Processor {
this.insertFilePath = Paths.get(baseDir, processorName, fileName).toString();
bufferWriteRelativePath = processorName + File.separatorChar + fileName;
- TsFileRecoverPerformer recoverPerformer =
- new TsFileRecoverPerformer(insertFilePath, logNodePrefix(),
- fileSchema, versionController, currentTsFileResource,
- currentTsFileResource != null ? currentTsFileResource.getModFile() : null);
- try {
- recoverPerformer.recover();
- } catch (ProcessorException e) {
- throw new BufferWriteProcessorException(e);
- }
open();
try {
getLogNode();
@@ -482,6 +473,9 @@ public class BufferWriteProcessor extends Processor {
@Override
public synchronized void close() throws BufferWriteProcessorException {
+ if (writer == null) {
+ return;
+ }
try {
// flushMetadata data (if there are flushing task, flushMetadata() will be blocked) and wait for finishing flushMetadata async
LOGGER.info("Submit a BufferWrite ({}) setCloseMark task.", getProcessorName());
@@ -590,6 +584,10 @@ public class BufferWriteProcessor extends Processor {
}
public String logNodePrefix() {
+ return logNodePrefix(processorName);
+ }
+
+ public static String logNodePrefix(String processorName) {
return processorName + "-BufferWrite-";
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 082063a..3177431 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -88,6 +88,7 @@ import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
@@ -470,15 +471,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
for (int i = 0; i < newFileNodes.size(); i++) {
TsFileResource tsFile = newFileNodes.get(i);
- String baseDir = directories
- .getTsFileFolder(tsFile.getBaseDirIndex());
try {
- // recover in initialization
- new BufferWriteProcessor(baseDir,
- getProcessorName(),
- tsFile.getFile().getName(), parameters, bufferwriteCloseConsumer,
- versionController, fileSchema, tsFile);
- } catch (BufferWriteProcessorException e) {
+ String filePath = tsFile.getFilePath();
+ String logNodePrefix = BufferWriteProcessor.logNodePrefix(processorName);
+ TsFileRecoverPerformer recoverPerformer =
+ new TsFileRecoverPerformer(filePath, logNodePrefix,
+ fileSchema, versionController, tsFile,
+ tsFile.getModFile());
+ recoverPerformer.recover();
+ } catch (ProcessorException e) {
LOGGER.error(
"The filenode processor {} failed to recover the bufferwrite processor, "
+ "the last bufferwrite file is {}.",
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 37b8c6f..8041aa4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
@@ -43,6 +45,7 @@ public class TsFileRecoverPerformer {
private VersionController versionController;
private LogReplayer logReplayer;
private IMemTable recoverMemTable;
+ private TsFileResource tsFileResource;
public TsFileRecoverPerformer(String insertFilePath, String processorName,
FileSchema fileSchema, VersionController versionController,
@@ -56,12 +59,20 @@ public class TsFileRecoverPerformer {
currentTsFileResource, fileSchema, recoverMemTable);
}
- public boolean recover() throws ProcessorException {
+ public void recover() throws ProcessorException {
File insertFile = new File(insertFilePath);
if (!insertFile.exists()) {
- return false;
+ return;
}
NativeRestorableIOWriter restorableTsFileIOWriter = recoverFile(insertFile);
+ // due to failure, the last ChunkGroup may contain the same data with the WALs, so the time
+ // map must be updated first to avoid duplicated insertion
+ for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
+ tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ }
+ }
logReplayer.replayLogs();
@@ -77,15 +88,11 @@ public class TsFileRecoverPerformer {
removeTruncatePosition(insertFile);
- WriteLogNode logNode;
try {
- logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + new File(insertFilePath).getName());
- logNode.delete();
+ MultiFileLogNodeManager.getInstance().deleteNode(processorName + new File(insertFilePath).getName());
} catch (IOException e) {
throw new ProcessorException(e);
}
- return true;
}