You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/24 13:29:14 UTC
[incubator-iotdb] 01/02: refactor RecoveryPerformer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 44561c9157861e7dfe607fc4d5c0a6c9d09675cb
Author: 江天 <jt...@163.com>
AuthorDate: Mon Jun 24 21:26:20 2019 +0800
refactor RecoveryPerformer
---
.../db/engine/filenode/FileNodeProcessor.java | 6 +-
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 11 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 13 ++-
...rPerformer.java => TsFileRecoverPerformer.java} | 12 ++-
.../recover/UnSeqTsFileRecoverPerformer.java | 117 ---------------------
.../db/writelog/recover/SeqTsFileRecoverTest.java | 5 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 4 +-
7 files changed, 31 insertions(+), 137 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 21a4321..e7df8dd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -88,7 +88,7 @@
//import org.apache.iotdb.db.utils.MemUtils;
//import org.apache.iotdb.db.utils.QueryUtils;
//import org.apache.iotdb.db.utils.TimeValuePair;
-//import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
+//import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
//import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
//import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
//import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
@@ -472,8 +472,8 @@
//// try {
//// String filePath = tsFile.getFilePath();
//// String logNodePrefix = BufferWriteProcessor.logNodePrefix(processorName);
-//// SeqTsFileRecoverPerformer recoverPerformer =
-//// new SeqTsFileRecoverPerformer(logNodePrefix,
+//// TsFileRecoverPerformer recoverPerformer =
+//// new TsFileRecoverPerformer(logNodePrefix,
//// fileSchema, versionController, tsFile);
//// recoverPerformer.recover();
//// } catch (ProcessorException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 42aeb87..97f8966 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -48,8 +47,7 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
-import org.apache.iotdb.db.writelog.recover.UnSeqTsFileRecoverPerformer;
+import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -169,7 +167,8 @@ public class FileNodeProcessorV2 {
for (File tsfile: tsfiles) {
TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
sequenceFileList.add(tsFileResource);
- SeqTsFileRecoverPerformer recoverPerformer = new SeqTsFileRecoverPerformer(storageGroupName + "-", fileSchema, versionController, tsFileResource);
+ TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
+ , fileSchema, versionController, tsFileResource, false);
recoverPerformer.recover();
}
}
@@ -179,8 +178,8 @@ public class FileNodeProcessorV2 {
for (File tsfile: tsfiles) {
TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
unSequenceFileList.add(tsFileResource);
- UnSeqTsFileRecoverPerformer recoverPerformer = new UnSeqTsFileRecoverPerformer(storageGroupName + "-", fileSchema,
- versionController, tsFileResource);
+ TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", fileSchema,
+ versionController, tsFileResource, true);
recoverPerformer.recover();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 65cdc3f..ded5d40 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.writelog.recover;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -60,6 +62,9 @@ public class LogReplayer {
// overflow file tolerates duplicated data
private boolean acceptDuplication;
+ private Map<String, Long> tempStartTimeMap = new HashMap<>();
+ private Map<String, Long> tempEndTimeMap = new HashMap<>();
+
public LogReplayer(String logNodePrefix, String insertFilePath,
ModificationFile modFile,
VersionController versionController,
@@ -103,6 +108,8 @@ public class LogReplayer {
} catch (IOException e) {
throw new ProcessorException("Cannot replay logs", e);
}
+ tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
+ tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
}
private void replayDelete(DeletePlan deletePlan) throws IOException {
@@ -121,7 +128,11 @@ public class LogReplayer {
!acceptDuplication) {
return;
}
- currentTsFileResource.updateTime(insertPlan.getDeviceId(), insertPlan.getTime());
+ tempStartTimeMap.putIfAbsent(insertPlan.getDeviceId(), insertPlan.getTime());
+ Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId());
+ if (endTime == null || endTime < insertPlan.getTime()) {
+ tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
+ }
}
String[] measurementList = insertPlan.getMeasurements();
TSDataType[] dataTypes = new TSDataType[measurementList.length];
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
similarity index 92%
rename from iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
rename to iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 6f1f342..84579c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -37,10 +37,10 @@ import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
/**
- * SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
+ * TsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
* crash and removes the redone logs.
*/
-public class SeqTsFileRecoverPerformer {
+public class TsFileRecoverPerformer {
private String insertFilePath;
private String logNodePrefix;
@@ -48,15 +48,17 @@ public class SeqTsFileRecoverPerformer {
private VersionController versionController;
private LogReplayer logReplayer;
private TsFileResourceV2 tsFileResource;
+ private boolean acceptUnseq;
- public SeqTsFileRecoverPerformer(String logNodePrefix,
+ public TsFileRecoverPerformer(String logNodePrefix,
FileSchema fileSchema, VersionController versionController,
- TsFileResourceV2 currentTsFileResource) {
+ TsFileResourceV2 currentTsFileResource, boolean acceptUnseq) {
this.insertFilePath = currentTsFileResource.getFile().getPath();
this.logNodePrefix = logNodePrefix;
this.fileSchema = fileSchema;
this.versionController = versionController;
this.tsFileResource = currentTsFileResource;
+ this.acceptUnseq = acceptUnseq;
}
/**
@@ -70,7 +72,7 @@ public class SeqTsFileRecoverPerformer {
IMemTable recoverMemTable = new PrimitiveMemTable();
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
- tsFileResource, fileSchema, recoverMemTable, false);
+ tsFileResource, fileSchema, recoverMemTable, acceptUnseq);
File insertFile = new File(insertFilePath);
if (!insertFile.exists()) {
return;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
deleted file mode 100644
index 116fadf..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
-
-/**
- * SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
- * crash and removes the redone logs.
- */
-public class UnSeqTsFileRecoverPerformer {
-
- private String insertFilePath;
- private String logNodePrefix;
- private FileSchema fileSchema;
- private VersionController versionController;
- private LogReplayer logReplayer;
- private TsFileResourceV2 tsFileResource;
-
- public UnSeqTsFileRecoverPerformer(String logNodePrefix,
- FileSchema fileSchema, VersionController versionController,
- TsFileResourceV2 currentTsFileResource) {
- this.insertFilePath = currentTsFileResource.getFile().getPath();
- this.logNodePrefix = logNodePrefix;
- this.fileSchema = fileSchema;
- this.versionController = versionController;
- this.tsFileResource = currentTsFileResource;
- }
-
- /**
- * 1. redo the WALs to recover unpersisted data
- * 2. flush and close the file
- * 3. clean WALs
- * @throws ProcessorException
- */
- public void recover() throws ProcessorException {
- IMemTable recoverMemTable = new PrimitiveMemTable();
- this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
- versionController,
- tsFileResource, fileSchema, recoverMemTable, true);
- File insertFile = new File(insertFilePath);
- if (!insertFile.exists()) {
- return;
- }
- // remove corrupted part of the TsFile
- NativeRestorableIOWriter restorableTsFileIOWriter = null;
- try {
- restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
-
- for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
- for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
- tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
- tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
- }
- }
-
- // redo logs
- logReplayer.replayLogs();
- if (recoverMemTable.isEmpty()) {
- return;
- }
-
- // flush logs
- MemTableFlushTask tableFlushTask = new MemTableFlushTask(restorableTsFileIOWriter,
- logNodePrefix, 0, (a,b) -> {});
- tableFlushTask.flushMemTable(fileSchema, recoverMemTable, versionController.nextVersion());
-
- // close file
- try {
- restorableTsFileIOWriter.endFile(fileSchema);
- } catch (IOException e) {
- throw new ProcessorException("Cannot setCloseMark file when recovering", e);
- }
-
- // clean logs
- try {
- MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index 334d690..f622299 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -122,8 +121,8 @@ public class SeqTsFileRecoverTest {
@Test
public void test() throws ProcessorException, IOException {
- SeqTsFileRecoverPerformer performer = new SeqTsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource);
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
+ versionController, resource, true);
performer.recover();
ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 9cb55f0..8f49300 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -127,8 +127,8 @@ public class UnseqTsFileRecoverTest {
@Test
public void test() throws ProcessorException, IOException {
- UnSeqTsFileRecoverPerformer performer = new UnSeqTsFileRecoverPerformer(logNodePrefix, schema,
- versionController, resource);
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, schema,
+ versionController, resource, true);
performer.recover();
TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);