You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/24 11:15:40 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
recover process
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new c59bf4a fix recover process
c59bf4a is described below
commit c59bf4a443e4f210c5672274cc908bd6c851ebaf
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Jun 24 19:15:27 2019 +0800
fix recover process
---
.../db/engine/filenodeV2/FileNodeManagerV2.java | 1 +
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 15 ++++--
.../db/engine/filenodeV2/TsFileResourceV2.java | 59 ++++++++++++++++++++++
.../filenodeV2/UnsealedTsFileProcessorV2.java | 3 ++
.../recover/SeqTsFileRecoverPerformer.java | 31 ++++++++----
.../tsfile/common/constant/SystemConstant.java | 1 +
.../iotdb/tsfile/read/TsFileRestorableReader.java | 2 +-
.../apache/iotdb/tsfile/write/TsFileWriter.java | 10 ----
.../write/writer/NativeRestorableIOWriter.java | 37 ++++++++------
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 1 -
.../write/writer/NativeRestorableIOWriterTest.java | 4 +-
11 files changed, 121 insertions(+), 43 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 3a8f4f0..b27e172 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -365,6 +365,7 @@ public class FileNodeManagerV2 implements IService {
}
/**
+ * flush command
* Sync asyncCloseOneProcessor all file node processors.
*/
public void syncCloseAllProcessor() throws FileNodeManagerException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 40a9151..890a9da 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.filenodeV2;
+import static org.apache.iotdb.tsfile.common.constant.SystemConstant.TSFILE_SUFFIX;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
@@ -33,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
@@ -144,7 +147,7 @@ public class FileNodeProcessorV2 {
if (!fileFolder.exists()) {
continue;
}
- for (File tsfile: fileFolder.listFiles(file->!file.getName().contains("mods"))) {
+ for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
tsFiles.add(tsfile);
}
}
@@ -157,11 +160,16 @@ public class FileNodeProcessorV2 {
if (!fileFolder.exists()) {
continue;
}
- for (File tsfile: fileFolder.listFiles(file->!file.getName().contains("mods"))) {
+ for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
tsFiles.add(tsfile);
}
}
recoverUnseqFiles(tsFiles);
+
+ for (TsFileResourceV2 resource: sequenceFileList) {
+ latestTimeForEachDevice.putAll(resource.getEndTimeMap());
+ latestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
+ }
}
private void recoverSeqFiles(List<File> tsfiles) throws ProcessorException {
@@ -304,7 +312,8 @@ public class FileNodeProcessorV2 {
new File(baseDir, storageGroupName).mkdirs();
String filePath = Paths.get(baseDir, storageGroupName,
- System.currentTimeMillis() + "-" + versionController.nextVersion()).toString();
+ System.currentTimeMillis() + "-" + versionController.nextVersion()).toString()
+ + TSFILE_SUFFIX;
return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index 86abcf2..2bfb04f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@ -18,19 +18,33 @@
*/
package org.apache.iotdb.db.engine.filenodeV2;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
public class TsFileResourceV2 {
private File file;
+ public static final String RESOURCE_SUFFIX = ".resource";
+
/**
* device -> start time
*/
@@ -81,6 +95,51 @@ public class TsFileResourceV2 {
this.readOnlyMemChunk = readOnlyMemChunk;
}
+ public TsFileResourceV2(File file, Map<String, Long> startTimeMap, Map<String, Long> endTimeMap) {
+ this.file = file;
+ this.startTimeMap = startTimeMap;
+ this.endTimeMap = endTimeMap;
+ this.closed = true;
+ }
+
+
+ public void serialize() throws IOException {
+ try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){
+ ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
+ for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ ReadWriteIOUtils.write(this.endTimeMap.size(), outputStream);
+ for (Entry<String, Long> entry : this.endTimeMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ }
+ }
+
+
+ public void deSerialize() throws IOException {
+ try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) {
+ int size = ReadWriteIOUtils.readInt(inputStream);
+ Map<String, Long> startTimes = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ startTimes.put(path, time);
+ }
+ size = ReadWriteIOUtils.readInt(inputStream);
+ Map<String, Long> endTimes = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ String path = ReadWriteIOUtils.readString(inputStream);
+ long time = ReadWriteIOUtils.readLong(inputStream);
+ endTimes.put(path, time);
+ }
+ this.startTimeMap = startTimes;
+ this.endTimeMap = endTimes;
+ }
+ }
+
public void updateStartTime(String device, long time) {
startTimeMap.putIfAbsent(device, time);
long startTime = startTimeMap.get(device);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index d3fde32..bac903d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -323,6 +323,9 @@ public class UnsealedTsFileProcessorV2 {
private void endFile() throws IOException {
long closeStartTime = System.currentTimeMillis();
+
+ tsFileResource.serialize();
+
writer.endFile(fileSchema);
//FIXME suppose the flush-thread-pool is 2.
// then if a flush task and a endFile task are running in the same time
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index 6f1f342..95703a0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.db.writelog.recover;
+import static org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2.RESOURCE_SUFFIX;
+
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.channels.FileChannel;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
+import org.apache.iotdb.db.engine.filenodeV2.UnsealedTsFileProcessorV2;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -78,17 +78,27 @@ public class SeqTsFileRecoverPerformer {
// remove corrupted part of the TsFile
NativeRestorableIOWriter restorableTsFileIOWriter = null;
try {
- restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile, true);
+ restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
} catch (IOException e) {
throw new ProcessorException(e);
}
- // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
- // map must be updated first to avoid duplicated insertion
- for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
- for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
- tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
- tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ if (!restorableTsFileIOWriter.hasCrashed()) {
+ try {
+ tsFileResource.deSerialize();
+ return;
+ } catch (IOException e) {
+ throw new ProcessorException("recover the resource file failed: " + insertFilePath
+ + RESOURCE_SUFFIX, e);
+ }
+ } else {
+ // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+ // map must be updated first to avoid duplicated insertion
+ for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
+ tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ }
}
}
@@ -106,6 +116,7 @@ public class SeqTsFileRecoverPerformer {
// close file
try {
restorableTsFileIOWriter.endFile(fileSchema);
+ tsFileResource.serialize();
} catch (IOException e) {
throw new ProcessorException("Cannot setCloseMark file when recovering", e);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
index d3a6f56..fe3063b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/SystemConstant.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.constant;
public class SystemConstant {
+ public static final String TSFILE_SUFFIX = ".tsfile";
public static final String TSFILE_HOME = "TSFILE_HOME";
public static final String TSFILE_CONF = "TSFILE_CONF";
public static final String PATH_SEPARATOR = ".";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
index 44b29a6..16b339c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileRestorableReader.java
@@ -61,7 +61,7 @@ public class TsFileRestorableReader extends TsFileSequenceReader {
if (!isComplete()) {
// Try to close it
LOGGER.info("File {} has no correct tail magic, try to repair...", file);
- NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(new File(file), false);
+ NativeRestorableIOWriter rWriter = new NativeRestorableIOWriter(new File(file));
TsFileWriter writer = new TsFileWriter(rWriter);
// This writes the right magic string
writer.close();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 52cfdf9..b626020 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -118,16 +118,6 @@ public class TsFileWriter implements AutoCloseable{
* init this TsFileWriter.
*
* @param file the File to be written by this TsFileWriter
- * @param conf the configuration of this TsFile
- */
- public TsFileWriter(File file, TSFileConfig conf) throws IOException {
- this(new TsFileIOWriter(file), new FileSchema(), conf);
- }
-
- /**
- * init this TsFileWriter.
- *
- * @param file the File to be written by this TsFileWriter
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
index 5d4b4e4..6b86691 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriter.java
@@ -48,6 +48,8 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
private int lastFlushedChunkGroupIndex = 0;
+ private boolean crashed;
+
/**
* all chunk group metadata which have been serialized on disk.
*/
@@ -58,40 +60,40 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
return truncatedPosition;
}
- public NativeRestorableIOWriter(File file) throws IOException {
- this(file, false);
- }
-
/**
* @param file a given tsfile path you want to (continue to) write
- * @param append if true, then the file can support appending data even though the file is complete (i.e., tail magic string exists)
* @throws IOException if write failed, or the file is broken but autoRepair==false.
*/
- public NativeRestorableIOWriter(File file, boolean append) throws IOException {
- super();
+ public NativeRestorableIOWriter(File file) throws IOException {
this.out = new DefaultTsFileOutput(file, true);
+
+ // file doesn't exist
if (file.length() == 0) {
- //this is a new file
+ startFile();
return;
}
+
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- if (reader.isComplete() && !append) {
+
+ // this tsfile is complete
+ if (reader.isComplete()) {
+ crashed = false;
canWrite = false;
out.close();
return;
}
- truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, !append);
- if (truncatedPosition == TsFileCheckStatus.COMPLETE_FILE && !append) {
- this.canWrite = false;
- out.close();
- } else if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+
+ // uncompleted file
+ truncatedPosition = reader.selfCheck(knownSchemas, chunkGroupMetaDataList, true);
+ if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
- throw new IOException(
- String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+ throw new IOException(String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
+ crashed = true;
out.truncate(TSFileConfig.MAGIC_STRING.length());
} else {
+ crashed = true;
//remove broken data
out.truncate(truncatedPosition + 1);
}
@@ -157,6 +159,9 @@ public class NativeRestorableIOWriter extends TsFileIOWriter {
}
}
+ public boolean hasCrashed() {
+ return crashed;
+ }
/**
* get all the chunkGroups' metadata which are appended after the last calling of this method, or
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 19995c7..4c491a7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -88,7 +88,6 @@ public class TsFileIOWriter {
*/
public TsFileIOWriter(File file) throws IOException {
this.out = new DefaultTsFileOutput(file);
- startFile();
}
/**
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
index 64b6376..b75f84f 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/NativeRestorableIOWriterTest.java
@@ -80,7 +80,7 @@ public class NativeRestorableIOWriterTest {
rWriter = new NativeRestorableIOWriter(file);
assertEquals(TsFileCheckStatus.COMPLETE_FILE, rWriter.getTruncatedPosition());
assertFalse(rWriter.canWrite());
- rWriter = new NativeRestorableIOWriter(file, true);
+ rWriter = new NativeRestorableIOWriter(file);
assertEquals(TSFileConfig.MAGIC_STRING.length(), rWriter.getTruncatedPosition());
writer = new TsFileWriter(rWriter);
writer.close();
@@ -322,7 +322,7 @@ public class NativeRestorableIOWriterTest {
assertFalse(rWriter.canWrite());
rWriter.close();
- rWriter = new NativeRestorableIOWriter(file, true);
+ rWriter = new NativeRestorableIOWriter(file);
assertTrue(rWriter.canWrite());
writer = new TsFileWriter(rWriter);
writer.write(new TSRecord(3, "d1").addTuple(new FloatDataPoint("s1", 5))