You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/20 03:38:53 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
recover performer for overflow
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new fe205c6 add recover performer for overflow
fe205c6 is described below
commit fe205c6a4fb9de9b8c966a45767e908d67072214
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jun 20 11:36:49 2019 +0800
add recover performer for overflow
---
.../db/engine/overflow/io/OverflowProcessor.java | 20 ++--
.../db/engine/overflow/io/OverflowResource.java | 10 +-
.../recover/SeqTsFileRecoverPerformer.java | 21 ++---
.../recover/UnseqTsFileRecoverPerformer.java | 102 +++++++++++++++++++++
4 files changed, 127 insertions(+), 26 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 4042654..43f2700 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.db.writelog.recover.LogReplayer;
+import org.apache.iotdb.db.writelog.recover.UnseqTsFileRecoverPerformer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -194,19 +195,12 @@ public class OverflowProcessor extends Processor {
throw new ProcessorException(e);
}
- IMemTable memTable = new PrimitiveMemTable();
- LogReplayer replayer = new LogReplayer(processorName, workResource.getInsertFilePath(),
- workResource.getModificationFile(), versionController, null, fileSchema,
- memTable);
- replayer.replayLogs();
- flushTask("recover asyncFlush", memTable, 0, (a,b) -> {});
- try {
- WriteLogNode node = MultiFileLogNodeManager.getInstance().getNode(
- workResource.logNodePrefix() + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
- node.delete();
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
+ UnseqTsFileRecoverPerformer recoverPerformer =
+ new UnseqTsFileRecoverPerformer(workResource.logNodePrefix(),
+ workResource.getInsertFilePath(),
+ workResource.getModificationFile(), versionController, fileSchema,
+ workResource.getInsertIO(),workResource.getAppendInsertMetadatas());
+ recoverPerformer.recover();
}
private String[] clearFile(String[] subFilePaths) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 859fff0..8722db2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -344,7 +344,7 @@ public class OverflowResource {
if (logNode == null) {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
logNode = MultiFileLogNodeManager.getInstance().getNode(
- logNodePrefix() + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
+ logNodePrefix() + insertFile.getName());
}
}
return logNode;
@@ -357,4 +357,12 @@ public class OverflowResource {
public ModificationFile getModificationFile() {
return modificationFile;
}
+
+ public OverflowIO getInsertIO() {
+ return insertIO;
+ }
+
+ public List<ChunkGroupMetaData> getAppendInsertMetadatas() {
+ return appendInsertMetadatas;
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index f1904c1..da00942 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -27,11 +27,9 @@ import org.apache.iotdb.db.engine.filenode.TsFileResource;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
@@ -40,28 +38,27 @@ import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
public class SeqTsFileRecoverPerformer {
private String insertFilePath;
- private String processorName;
+ private String logNodePrefix;
private FileSchema fileSchema;
private VersionController versionController;
private LogReplayer logReplayer;
- private IMemTable recoverMemTable;
private TsFileResource tsFileResource;
- public SeqTsFileRecoverPerformer(String insertFilePath, String processorName,
+ public SeqTsFileRecoverPerformer(String insertFilePath, String logNodePrefix,
FileSchema fileSchema, VersionController versionController,
TsFileResource currentTsFileResource) {
this.insertFilePath = insertFilePath;
- this.processorName = processorName;
+ this.logNodePrefix = logNodePrefix;
this.fileSchema = fileSchema;
this.versionController = versionController;
- this.recoverMemTable = new PrimitiveMemTable();
- this.logReplayer = new LogReplayer(processorName, insertFilePath, currentTsFileResource.getModFile(),
- versionController,
- currentTsFileResource, fileSchema, recoverMemTable);
this.tsFileResource = currentTsFileResource;
}
public void recover() throws ProcessorException {
+ IMemTable recoverMemTable = new PrimitiveMemTable();
+ this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
+ versionController,
+ tsFileResource, fileSchema, recoverMemTable);
File insertFile = new File(insertFilePath);
if (!insertFile.exists()) {
return;
@@ -79,7 +76,7 @@ public class SeqTsFileRecoverPerformer {
logReplayer.replayLogs();
MemTableFlushTask tableFlushTask = new MemTableFlushTask(restorableTsFileIOWriter,
- processorName, 0, (a,b) -> {});
+ logNodePrefix, 0, (a,b) -> {});
tableFlushTask.flushMemTable(fileSchema, recoverMemTable, versionController.nextVersion());
try {
@@ -91,7 +88,7 @@ public class SeqTsFileRecoverPerformer {
removeTruncatePosition(insertFile);
try {
- MultiFileLogNodeManager.getInstance().deleteNode(processorName + new File(insertFilePath).getName());
+ MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
} catch (IOException e) {
throw new ProcessorException(e);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
new file mode 100644
index 0000000..d0a046c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.recover;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushCallBack;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.overflow.io.OverflowIO;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+
+public class UnseqTsFileRecoverPerformer {
+
+ private String logNodePrefix;
+ private String insertFilePath;
+ private ModificationFile modFile;
+ private VersionController versionController;
+ private FileSchema fileSchema;
+ private OverflowIO overflowIO;
+ private List<ChunkGroupMetaData> appendInsertMetadatas;
+
+ public UnseqTsFileRecoverPerformer(String logNodePrefix, String insertFilePath,
+ ModificationFile modFile,
+ VersionController versionController, FileSchema fileSchema,
+ OverflowIO overflowIO,
+ List<ChunkGroupMetaData> appendInsertMetadatas) {
+ this.logNodePrefix = logNodePrefix;
+ this.insertFilePath = insertFilePath;
+ this.modFile = modFile;
+ this.versionController = versionController;
+ this.fileSchema = fileSchema;
+ this.overflowIO = overflowIO;
+ this.appendInsertMetadatas = appendInsertMetadatas;
+ }
+
+ public void recover() throws ProcessorException {
+ IMemTable memTable = new PrimitiveMemTable();
+ LogReplayer replayer = new LogReplayer(logNodePrefix, insertFilePath,
+ modFile, versionController, null, fileSchema, memTable);
+ replayer.replayLogs();
+ try {
+ flush(memTable);
+ MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
+ } catch (IOException e) {
+ throw new ProcessorException(e);
+ }
+ }
+
+ public void flush(IMemTable memTable) throws IOException {
+ if (memTable != null && !memTable.isEmpty()) {
+ overflowIO.toTail();
+ long lastPosition = overflowIO.getPos();
+ MemTableFlushTask tableFlushTask = new MemTableFlushTask(overflowIO, logNodePrefix, 0,
+ (a,b) -> {});
+ tableFlushTask.flushMemTable(fileSchema, memTable, versionController.nextVersion());
+
+ List<ChunkGroupMetaData> rowGroupMetaDatas = overflowIO.getChunkGroupMetaDatas();
+ appendInsertMetadatas.addAll(rowGroupMetaDatas);
+ if (!rowGroupMetaDatas.isEmpty()) {
+ overflowIO.getWriter().write(BytesUtils.longToBytes(lastPosition));
+ TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
+ tsDeviceMetadata.setChunkGroupMetadataList(rowGroupMetaDatas);
+ long start = overflowIO.getPos();
+ tsDeviceMetadata.serializeTo(overflowIO.getOutputStream());
+ long end = overflowIO.getPos();
+ overflowIO.getWriter().write(BytesUtils.intToBytes((int) (end - start)));
+ // clear the meta-data of insert IO
+ overflowIO.clearRowGroupMetadatas();
+ }
+ }
+ }
+}