You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/21 02:57:01 UTC

[incubator-iotdb] 01/01: recover for restart

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 34651d01766fa7313b1071ae79d10f9579ffa3a9
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jul 21 10:56:49 2020 +0800

    recover for restart
---
 .../writelog/recover/TsFileRecoverPerformer.java   | 142 ++++++++++-----------
 1 file changed, 64 insertions(+), 78 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 1a146d6..b4b2cdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.writelog.recover;
 
 import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
-import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
 
 import java.io.File;
@@ -33,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -63,14 +63,14 @@ public class TsFileRecoverPerformer {
   private String filePath;
   private String logNodePrefix;
   private VersionController versionController;
-  private TsFileResource resource;
+  private TsFileResource tsFileResource;
   private boolean sequence;
   private boolean isLastFile;
 
   private List<List<TsFileResource>> vmTsFileResources;
 
   /**
-   * @param isLastFile whether this TsFile is the last file of its partition
+   * @param isLastFile        whether this TsFile is the last file of its partition
    * @param vmTsFileResources only last file could have non-empty vmTsFileResources
    */
   public TsFileRecoverPerformer(String logNodePrefix, VersionController versionController,
@@ -79,7 +79,7 @@ public class TsFileRecoverPerformer {
     this.filePath = currentTsFileResource.getTsFilePath();
     this.logNodePrefix = logNodePrefix;
     this.versionController = versionController;
-    this.resource = currentTsFileResource;
+    this.tsFileResource = currentTsFileResource;
     this.sequence = sequence;
     this.isLastFile = isLastFile;
     this.vmTsFileResources = vmTsFileResources;
@@ -109,16 +109,11 @@ public class TsFileRecoverPerformer {
       logger.error("TsFile {} is missing, will skip its recovery.", filePath);
       return null;
     }
-    for (List<File> subVmFileList : vmFileList) {
-      for (File vmFile : subVmFileList) {
-        if (!vmFile.exists()) {
-          logger.error("VMFile {} is missing, will skip its recovery.", vmFile.getPath());
-          return null;
-        }
-      }
-    }
+
     // remove corrupted part of the TsFile
     RestorableTsFileIOWriter restorableTsFileIOWriter;
+
+    // create RestorableTsFileIOWriter for all vmfiles in each level
     List<List<RestorableTsFileIOWriter>> vmRestorableTsFileIOWriterList = new ArrayList<>();
     try {
       restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
@@ -126,7 +121,10 @@ public class TsFileRecoverPerformer {
         vmRestorableTsFileIOWriterList.add(new ArrayList<>());
         for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
           file = vmFileList.get(i).get(j);
-          vmRestorableTsFileIOWriterList.get(i).add(new RestorableTsFileIOWriter(file));
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(file);
+          // recover vmfiles' metadata
+          recoverResourceFromWriter(writer, vmTsFileResources.get(i).get(j));
+          vmRestorableTsFileIOWriterList.get(i).add(writer);
         }
       }
     } catch (NotCompatibleTsFileException e) {
@@ -137,21 +135,11 @@ public class TsFileRecoverPerformer {
       throw new StorageGroupProcessorException(e);
     }
 
-    RestorableTsFileIOWriter lastRestorableTsFileIOWriter =
-        vmTsFileResources.isEmpty() ? restorableTsFileIOWriter
-            : vmRestorableTsFileIOWriterList.get(0)
-                .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
-
-    TsFileResource lastTsFileResource = vmTsFileResources.isEmpty() ? resource
-        : vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
-
-    boolean isComplete =
-        !lastRestorableTsFileIOWriter.hasCrashed() && !lastRestorableTsFileIOWriter.canWrite();
-
-    if (isComplete) {
-      // tsfile is complete, vmfile is never complete because it's canWrite() always return true.
+    // judge whether tsfile is complete
+    if (!restorableTsFileIOWriter.hasCrashed()) {
+      // if tsfile is complete, then there is no vmfiles.
       try {
-        recoverResource(resource);
+        recoverResource(tsFileResource);
         return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
       } catch (IOException e) {
         throw new StorageGroupProcessorException(
@@ -159,61 +147,59 @@ public class TsFileRecoverPerformer {
                 + RESOURCE_SUFFIX + e);
       }
     } else {
-      if (!vmTsFileResources.isEmpty()) {
-        for (int i = 0; i < vmTsFileResources.size(); i++) {
-          for (int j = 0; j < vmTsFileResources.get(i).size(); j++) {
-            recoverResourceFromWriter(vmRestorableTsFileIOWriterList.get(i).get(j),
-                vmTsFileResources.get(i).get(j));
-          }
-        }
-        recoverResourceFromWriter(restorableTsFileIOWriter, resource);
-        boolean vmFileNotCrashed = !getFlushLogFile(restorableTsFileIOWriter).exists();
-        // if the last file in vmTsFileResources is not crashed
-        if (vmFileNotCrashed) {
-          try {
-            boolean tsFileNotCrashed = !isVMLoggerFileExist(restorableTsFileIOWriter);
-            // tsfile is not crash
-            if (tsFileNotCrashed) {
+      // tsfile has crashed
+      // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
+      // map must be updated first to avoid duplicated insertion
+      recoverResourceFromWriter(restorableTsFileIOWriter, tsFileResource);
+    }
+
+    // If the vm is not enable, the walTargetWriter points to  the tsfile.
+    // If the vm is enable and flush log exists, the walTargetWriter points to the vm of the flush log
+    // if the vm is enable and flush log does not exist, the walTargetWriter is null.
+    RestorableTsFileIOWriter walTargetWriter = null;
+    TsFileResource walTargetResource = null;
 
-              // if wal exists, we should open a new vmfile to replay it
-              File newVmFile = resource.getProcessor().createNewVMFile(resource, 0);
-              TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
-              RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
-              if (redoLogs(newVMWriter, newVmTsFileResource)) {
-                vmTsFileResources.get(0).add(newVmTsFileResource);
-                vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
-              } else {
-                Files.delete(newVmFile.toPath());
-              }
-            } else {
-              IMemTable recoverMemTable = new PrimitiveMemTable();
-              recoverMemTable.setVersion(versionController.nextVersion());
-              LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath,
-                  resource.getModFile(),
-                  versionController, resource, recoverMemTable, sequence);
-              logReplayer.replayLogs();
-            }
-            // clean logs
-            MultiFileLogNodeManager.getInstance().deleteNode(
-                logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
-            updateTsFileResource();
-            return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
-          } catch (IOException e) {
-            throw new StorageGroupProcessorException(
-                "recover the resource file failed: " + filePath
-                    + RESOURCE_SUFFIX + e);
-          }
+    boolean enableVM = IoTDBDescriptor.getInstance().getConfig().isEnableVm();
+    boolean flushLogExist = getFlushLogFile(restorableTsFileIOWriter).exists();
+
+    if (!enableVM) {
+      walTargetWriter = restorableTsFileIOWriter;
+      walTargetResource = tsFileResource;
+    } else if (flushLogExist) {
+      walTargetWriter = vmRestorableTsFileIOWriterList.get(0)
+          .get(vmRestorableTsFileIOWriterList.get(0).size() - 1);
+      walTargetResource = vmTsFileResources.get(0).get(vmTsFileResources.get(0).size() - 1);
+    }
+
+
+    if (walTargetWriter == null) {
+      try {
+        // if wal exists, we should open a new vmfile to replay it
+        File newVmFile = tsFileResource.getProcessor().createNewVMFile(tsFileResource, 0);
+        TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
+        RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
+        if (redoLogs(newVMWriter, newVmTsFileResource)) {
+          // recover metadata for new vmfile
+          recoverResourceFromWriter(newVMWriter, newVmTsFileResource);
+          vmTsFileResources.get(0).add(newVmTsFileResource);
+          vmRestorableTsFileIOWriterList.get(0).add(newVMWriter);
+          // clean logs
+          MultiFileLogNodeManager.getInstance().deleteNode(
+              logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
+          updateTsFileResource();
+        } else {
+          Files.delete(newVmFile.toPath());
         }
-      } else {
-        // tsfile has crashed
-        // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
-        // map must be updated first to avoid duplicated insertion
-        recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
+        return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
+      } catch (IOException e) {
+        throw new StorageGroupProcessorException(
+            "recover the resource file failed: " + filePath
+                + RESOURCE_SUFFIX + e);
       }
     }
 
     // redo logs
-    redoLogs(lastRestorableTsFileIOWriter, lastTsFileResource);
+    redoLogs(walTargetWriter, walTargetResource);
 
     // clean logs
     try {
@@ -232,8 +218,8 @@ public class TsFileRecoverPerformer {
         for (Entry<String, Integer> entry : tsFileResource.getDeviceToIndexMap().entrySet()) {
           String device = entry.getKey();
           int index = entry.getValue();
-          resource.updateStartTime(device, tsFileResource.getStartTime(index));
-          resource.updateEndTime(device, tsFileResource.getEndTime(index));
+          this.tsFileResource.updateStartTime(device, tsFileResource.getStartTime(index));
+          this.tsFileResource.updateEndTime(device, tsFileResource.getEndTime(index));
         }
       }
     }