You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/21 03:19:40 UTC

[incubator-iotdb] branch TyRecover created (now aa857af)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch TyRecover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at aa857af  fix recover bug

This branch includes the following new commits:

     new aa857af  fix recover bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix recover bug

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TyRecover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit aa857afc286296a625d209f42bcd6585c0f81c02
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jul 21 11:18:46 2020 +0800

    fix recover bug
---
 .../writelog/recover/TsFileRecoverPerformer.java   | 145 +++++++++------------
 1 file changed, 65 insertions(+), 80 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index d75de25..7e5f304 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,8 +20,6 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
-import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
 
 import java.io.File;
@@ -34,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -64,7 +63,7 @@ public class TsFileRecoverPerformer {
   private String filePath;
   private String logNodePrefix;
   private VersionController versionController;
-  private TsFileResource resource;
+  private TsFileResource tsFileResource;
   private boolean sequence;
   private boolean isLastFile;
 
@@ -80,7 +79,7 @@ public class TsFileRecoverPerformer {
     this.filePath = currentTsFileResource.getTsFilePath();
     this.logNodePrefix = logNodePrefix;
     this.versionController = versionController;
-    this.resource = currentTsFileResource;
+    this.tsFileResource = currentTsFileResource;
     this.sequence = sequence;
     this.isLastFile = isLastFile;
     this.vmTsFileResources = vmTsFileResources;
@@ -110,16 +109,11 @@ public class TsFileRecoverPerformer {
       logger.error("TsFile {} is missing, will skip its recovery.", filePath);
       return null;
     }
-    for (List<File> subVmFileList : vmFileList) {
-      for (File vmFile : subVmFileList) {
-        if (!vmFile.exists()) {
-          logger.error("VMFile {} is missing, will skip its recovery.", vmFile.getPath());
-          return null;
-        }
-      }
-    }
+
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
+
+    // create RestorableTsFileIOWriter for all vmfiles in each level
     List<List<RestorableTsFileIOWriter>> vmRestorableTsFileIOWriterList = new ArrayList<>();
     try {
       restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
@@ -127,7 +121,10 @@ public class TsFileRecoverPerformer {
         vmRestorableTsFileIOWriterList.add(new ArrayList<>());
         for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
           file = vmFileList.get(i).get(j);
-          vmRestorableTsFileIOWriterList.get(i).add(new RestorableTsFileIOWriter(file));
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(file);
+          // recover vmfiles' metadata
+          recoverResourceFromWriter(writer, vmTsFileResources.get(i).get(j));
+          vmRestorableTsFileIOWriterList.get(i).add(writer);
         }
       }
     } catch (NotCompatibleTsFileException e) {
@@ -138,21 +135,11 @@ public class TsFileRecoverPerformer {
       throw new StorageGroupProcessorException(e);
     }
 
-    RestorableTsFileIOWriter lastRestorableTsFileIOWriter =
-        vmTsFileResources.isEmpty() ? restorableTsFileIOWriter
-            : vmRestorableTsFileIOWriterList.get(0)
-                .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
-
-    TsFileResource lastTsFileResource = vmTsFileResources.isEmpty() ? resource
-        : vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
-
-    boolean isComplete =
-        !lastRestorableTsFileIOWriter.hasCrashed() && !lastRestorableTsFileIOWriter.canWrite();
-
-    if (isComplete) {
-      // tsfile is complete, vmfile is never complete because it's canWrite() always return true.
+    // judge whether tsfile is complete
+    if (!restorableTsFileIOWriter.hasCrashed()) {
+      // if tsfile is complete, then there is no vmfiles.
       try {
-        recoverResource(resource);
+        recoverResource(tsFileResource);
         return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
       } catch (IOException e) {
         throw new StorageGroupProcessorException(
@@ -160,61 +147,59 @@ public class TsFileRecoverPerformer {
                 + RESOURCE_SUFFIX + e);
       }
     } else {
-      if (!vmTsFileResources.isEmpty()) {
-        for (int i = 0; i < vmTsFileResources.size(); i++) {
-          for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
-            recoverResourceFromWriter(vmRestorableTsFileIOWriterList.get(i).get(j),
-                vmTsFileResources.get(i).get(j));
-          }
-        }
-        recoverResourceFromWriter(restorableTsFileIOWriter, resource);
-        boolean vmFileNotCrashed = !getFlushLogFile(restorableTsFileIOWriter).exists();
-        // if the last file in vmTsFileResources is not crashed
-        if (vmFileNotCrashed) {
-          try {
-            boolean tsFileNotCrashed = !isVMLoggerFileExist(restorableTsFileIOWriter);
-            // tsfile is not crash
-            if (tsFileNotCrashed) {
+      // tsfile has crashed
+      // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+      // map must be updated first to avoid duplicated insertion
+      recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+    }
+
+    // If the vm is not enable, the walTargetWriter points to  the tsfile.
+    // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
+    // if the vm is enable and flush log does not exist, the walTargetWriter is null.
+    RestorableTsFileIOWriter walTargetWriter = null;
+    TsFileResource walTargetResource = null;
 
-              // if wal exists, we should open a new vmfile to replay it
-              File newVmFile = createNewVMFile(resource, 0);
-              TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
-              RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
-              if (redoLogs(newVMWriter, newVmTsFileResource)) {
-                vmTsFileResources.get(0).add(newVmTsFileResource);
-                vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
-              } else {
-                Files.delete(newVmFile.toPath());
-              }
-            } else {
-              IMemTable recoverMemTable = new PrimitiveMemTable();
-              recoverMemTable.setVersion(versionController.nextVersion());
-              LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath,
-                  resource.getModFile(),
-                  versionController, resource, recoverMemTable, sequence);
-              logReplayer.replayLogs();
-            }
-            // clean logs
-            MultiFileLogNodeManager.getInstance().deleteNode(
-                logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
-            updateTsFileResource();
-            return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
-          } catch (IOException e) {
-            throw new StorageGroupProcessorException(
-                "recover the resource file failed: " + filePath
-                    + RESOURCE_SUFFIX + e);
-          }
+    boolean enableVM = IoTDBDescriptor.getInstance().getConfig().isEnableVm();
+    boolean flushLogExist = getFlushLogFile(restorableTsFileIOWriter).exists();
+
+    if (!enableVM) {
+      walTargetWriter = restorableTsFileIOWriter;
+      walTargetResource = tsFileResource;
+    } else if (flushLogExist) {
+      walTargetWriter = vmRestorableTsFileIOWriterList.get(0)
+          .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
+      walTargetResource = vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
+    }
+
+
+    if (walTargetWriter == null) {
+      try {
+        // if wal exists, we should open a new vmfile to replay it
+        File newVmFile = tsFileResource.getProcessor().createNewVMFile(tsFileResource, 0);
+        TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
+        RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
+        if (redoLogs(newVMWriter, newVmTsFileResource)) {
+          // recover metadata for new vmfile
+          recoverResourceFromWriter(newVMWriter, newVmTsFileResource);
+          vmTsFileResources.get(0).add(newVmTsFileResource);
+          vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
+          // clean logs
+          MultiFileLogNodeManager.getInstance().deleteNode(
+              logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+          updateTsFileResource();
+        } else {
+          Files.delete(newVmFile.toPath());
         }
-      } else {
-        // tsfile has crashed
-        // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
-        // map must be updated first to avoid duplicated insertion
-        recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
+        return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
+      } catch (IOException e) {
+        throw new StorageGroupProcessorException(
+            "recover the resource file failed: " + filePath
+                + RESOURCE_SUFFIX + e);
       }
     }
 
     // redo logs
-    redoLogs(lastRestorableTsFileIOWriter, lastTsFileResource);
+    redoLogs(walTargetWriter, walTargetResource);
 
     // clean logs
     try {
@@ -229,12 +214,12 @@ public class TsFileRecoverPerformer {
 
   private void updateTsFileResource() {
     for (List<TsFileResource> subTsFileResources : vmTsFileResources) {
-      for (TsFileResource tsFileResource : subTsFileResources) {
-        for (Entry<String, Integer> entry : tsFileResource.getDeviceToIndexMap().entrySet()) {
+      for (TsFileResource resource : subTsFileResources) {
+        for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
           String device = entry.getKey();
           int index = entry.getValue();
-          resource.updateStartTime(device, tsFileResource.getStartTime(index));
-          resource.updateEndTime(device, tsFileResource.getEndTime(index));
+          tsFileResource.updateStartTime(device, resource.getStartTime(index));
+          tsFileResource.updateEndTime(device, resource.getEndTime(index));
         }
       }
     }