You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/20 03:38:53 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add recover performer for overflow

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new fe205c6  add recover performer for overflow
fe205c6 is described below

commit fe205c6a4fb9de9b8c966a45767e908d67072214
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jun 20 11:36:49 2019 +0800

    add recover performer for overflow
---
 .../db/engine/overflow/io/OverflowProcessor.java   |  20 ++--
 .../db/engine/overflow/io/OverflowResource.java    |  10 +-
 .../recover/SeqTsFileRecoverPerformer.java         |  21 ++---
 .../recover/UnseqTsFileRecoverPerformer.java       | 102 +++++++++++++++++++++
 4 files changed, 127 insertions(+), 26 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 4042654..43f2700 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.db.writelog.recover.LogReplayer;
+import org.apache.iotdb.db.writelog.recover.UnseqTsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -194,19 +195,12 @@ public class OverflowProcessor extends Processor {
       throw new ProcessorException(e);
     }
 
-    IMemTable memTable = new PrimitiveMemTable();
-    LogReplayer replayer = new LogReplayer(processorName, workResource.getInsertFilePath(),
-        workResource.getModificationFile(), versionController, null, fileSchema,
-        memTable);
-    replayer.replayLogs();
-    flushTask("recover asyncFlush", memTable, 0, (a,b) -> {});
-    try {
-      WriteLogNode node = MultiFileLogNodeManager.getInstance().getNode(
-          workResource.logNodePrefix() + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
-      node.delete();
-    } catch (IOException e) {
-      throw new ProcessorException(e);
-    }
+    UnseqTsFileRecoverPerformer recoverPerformer =
+        new UnseqTsFileRecoverPerformer(workResource.logNodePrefix(),
+            workResource.getInsertFilePath(),
+            workResource.getModificationFile(), versionController, fileSchema,
+            workResource.getInsertIO(),workResource.getAppendInsertMetadatas());
+    recoverPerformer.recover();
   }
 
   private String[] clearFile(String[] subFilePaths) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 859fff0..8722db2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -344,7 +344,7 @@ public class OverflowResource {
     if (logNode == null) {
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
         logNode = MultiFileLogNodeManager.getInstance().getNode(
-            logNodePrefix() + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
+            logNodePrefix() + insertFile.getName());
       }
     }
     return logNode;
@@ -357,4 +357,12 @@ public class OverflowResource {
   public ModificationFile getModificationFile() {
     return modificationFile;
   }
+
+  public OverflowIO getInsertIO() {
+    return insertIO;
+  }
+
+  public List<ChunkGroupMetaData> getAppendInsertMetadatas() {
+    return appendInsertMetadatas;
+  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index f1904c1..da00942 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -27,11 +27,9 @@ import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
@@ -40,28 +38,27 @@ import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
 public class SeqTsFileRecoverPerformer {
 
   private String insertFilePath;
-  private String processorName;
+  private String logNodePrefix;
   private FileSchema fileSchema;
   private VersionController versionController;
   private LogReplayer logReplayer;
-  private IMemTable recoverMemTable;
   private TsFileResource tsFileResource;
 
-  public SeqTsFileRecoverPerformer(String insertFilePath, String processorName,
+  public SeqTsFileRecoverPerformer(String insertFilePath, String logNodePrefix,
       FileSchema fileSchema, VersionController versionController,
       TsFileResource currentTsFileResource) {
     this.insertFilePath = insertFilePath;
-    this.processorName = processorName;
+    this.logNodePrefix = logNodePrefix;
     this.fileSchema = fileSchema;
     this.versionController = versionController;
-    this.recoverMemTable = new PrimitiveMemTable();
-    this.logReplayer = new LogReplayer(processorName, insertFilePath, currentTsFileResource.getModFile(),
-        versionController,
-        currentTsFileResource, fileSchema, recoverMemTable);
     this.tsFileResource = currentTsFileResource;
   }
 
   public void recover() throws ProcessorException {
+    IMemTable recoverMemTable = new PrimitiveMemTable();
+    this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
+        versionController,
+        tsFileResource, fileSchema, recoverMemTable);
     File insertFile = new File(insertFilePath);
     if (!insertFile.exists()) {
       return;
@@ -79,7 +76,7 @@ public class SeqTsFileRecoverPerformer {
     logReplayer.replayLogs();
 
     MemTableFlushTask tableFlushTask = new MemTableFlushTask(restorableTsFileIOWriter,
-        processorName, 0, (a,b) -> {});
+        logNodePrefix, 0, (a,b) -> {});
     tableFlushTask.flushMemTable(fileSchema, recoverMemTable, versionController.nextVersion());
 
     try {
@@ -91,7 +88,7 @@ public class SeqTsFileRecoverPerformer {
     removeTruncatePosition(insertFile);
 
     try {
-      MultiFileLogNodeManager.getInstance().deleteNode(processorName + new File(insertFilePath).getName());
+      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
     } catch (IOException e) {
       throw new ProcessorException(e);
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
new file mode 100644
index 0000000..d0a046c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.recover;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushCallBack;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.overflow.io.OverflowIO;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+
+public class UnseqTsFileRecoverPerformer {
+
+  private String logNodePrefix;
+  private String insertFilePath;
+  private ModificationFile modFile;
+  private VersionController versionController;
+  private FileSchema fileSchema;
+  private OverflowIO overflowIO;
+  private List<ChunkGroupMetaData> appendInsertMetadatas;
+
+  public UnseqTsFileRecoverPerformer(String logNodePrefix, String insertFilePath,
+      ModificationFile modFile,
+      VersionController versionController, FileSchema fileSchema,
+      OverflowIO overflowIO,
+      List<ChunkGroupMetaData> appendInsertMetadatas) {
+    this.logNodePrefix = logNodePrefix;
+    this.insertFilePath = insertFilePath;
+    this.modFile = modFile;
+    this.versionController = versionController;
+    this.fileSchema = fileSchema;
+    this.overflowIO = overflowIO;
+    this.appendInsertMetadatas = appendInsertMetadatas;
+  }
+
+  public void recover() throws ProcessorException {
+    IMemTable memTable = new PrimitiveMemTable();
+    LogReplayer replayer = new LogReplayer(logNodePrefix, insertFilePath,
+        modFile, versionController, null, fileSchema, memTable);
+    replayer.replayLogs();
+    try {
+      flush(memTable);
+      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
+    } catch (IOException e) {
+      throw new ProcessorException(e);
+    }
+  }
+
+  public void flush(IMemTable memTable) throws IOException {
+    if (memTable != null && !memTable.isEmpty()) {
+      overflowIO.toTail();
+      long lastPosition = overflowIO.getPos();
+      MemTableFlushTask tableFlushTask = new MemTableFlushTask(overflowIO, logNodePrefix, 0,
+          (a,b) -> {});
+      tableFlushTask.flushMemTable(fileSchema, memTable, versionController.nextVersion());
+
+      List<ChunkGroupMetaData> rowGroupMetaDatas = overflowIO.getChunkGroupMetaDatas();
+      appendInsertMetadatas.addAll(rowGroupMetaDatas);
+      if (!rowGroupMetaDatas.isEmpty()) {
+        overflowIO.getWriter().write(BytesUtils.longToBytes(lastPosition));
+        TsDeviceMetadata tsDeviceMetadata = new TsDeviceMetadata();
+        tsDeviceMetadata.setChunkGroupMetadataList(rowGroupMetaDatas);
+        long start = overflowIO.getPos();
+        tsDeviceMetadata.serializeTo(overflowIO.getOutputStream());
+        long end = overflowIO.getPos();
+        overflowIO.getWriter().write(BytesUtils.intToBytes((int) (end - start)));
+        // clear the meta-data of insert IO
+        overflowIO.clearRowGroupMetadatas();
+      }
+    }
+  }
+}