You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/07/21 14:31:53 UTC

[incubator-iotdb] branch TyRecover updated: add some log and fix recover bug

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TyRecover
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/TyRecover by this push:
     new 09c7233  add some log and fix recover bug
     new 2e9b995  Merge pull request #1531 from zhanglingzhe0820/fix_TyRecover
09c7233 is described below

commit 09c723393e481f292d7e8ba2c704454890882e1d
Author: 张凌哲 <44...@qq.com>
AuthorDate: Tue Jul 21 22:26:27 2020 +0800

    add some log and fix recover bug
---
 .../iotdb/db/engine/flush/VmLogAnalyzer.java       |  3 +-
 .../db/engine/storagegroup/TsFileProcessor.java    | 43 ++++++++++++++--------
 2 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
index 67612b6..4f96078 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/VmLogAnalyzer.java
@@ -54,8 +54,7 @@ public class VmLogAnalyzer {
   public void analyze() throws IOException {
     String currLine;
     try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) {
-      currLine = bufferedReader.readLine();
-      while (currLine != null) {
+      while ((currLine = bufferedReader.readLine()) != null) {
         switch (currLine) {
           case SOURCE_NAME:
             currLine = bufferedReader.readLine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ce1699e..ff09819 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -691,7 +691,11 @@ public class TsFileProcessor {
         }
         if (targetFile.getName().endsWith(TSFILE_SUFFIX)) {
           if (!isMergeFinished) {
-            writer.getIOWriterOut().truncate(offset - 1);
+            logger.info("{}: {} merge recover {} level vms to TsFile", storageGroupName,
+                tsFileResource.getTsFile().getName(), vmWriters.size());
+            if (offset > 0) {
+              writer.getIOWriterOut().truncate(offset - 1);
+            }
             VmMergeUtils.merge(writer, packVmWritersToSequenceList(vmWriters),
                 storageGroupName,
                 new VmLogger(tsFileResource.getTsFile().getParent(),
@@ -718,9 +722,16 @@ public class TsFileProcessor {
             if (deviceSet.isEmpty()) {
               Files.delete(targetFile.toPath());
             } else {
+              logger
+                  .info("{}: {} [Hot Compaction Recover] merge level-{}'s {} vms to next level vm",
+                      storageGroupName, tsFileResource.getTsFile().getName(), 0,
+                      vmWriters.get(0).size());
+              if (offset > 0) {
+                writer.getIOWriterOut().truncate(offset - 1);
+              }
               newVmWriter.getIOWriterOut().truncate(offset - 1);
               // vm files must be sequence, so we just have to find the first file
-              int startIndex = 0;
+              int startIndex;
               for (startIndex = 0; startIndex < vmWriters.get(level).size(); startIndex++) {
                 RestorableTsFileIOWriter levelVmWriter = vmWriters.get(level).get(startIndex);
                 if (levelVmWriter.getFile().getAbsolutePath()
@@ -844,17 +855,10 @@ public class TsFileProcessor {
           }
           logger.info("{}: [Hot Compaction] Start to merge total {} levels' vm to TsFile {}",
               storageGroupName, vmTsFileResources.size() + 1, tsFileResource.getTsFile().getName());
-          new Thread(() -> {
-            try {
-              TimeUnit.SECONDS.sleep(1);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-            System.exit(1);
-          }).start();
           long startTimeMillis = System.currentTimeMillis();
           VmLogger vmLogger = new VmLogger(tsFileResource.getTsFile().getParent(),
               tsFileResource.getTsFile().getName());
+          vmLogger.logFile(TARGET_NAME, writer.getFile());
           flushAllVmToTsFile(vmWriters, vmTsFileResources, vmLogger);
           vmLogger.logMergeFinish();
           vmLogger.close();
@@ -864,9 +868,10 @@ public class TsFileProcessor {
           if (logFile.exists()) {
             Files.delete(logFile.toPath());
           }
-          logger.info("{}: [Hot Compaction] All vms are merged to TsFile {}, time consumption: {} ms",
-              storageGroupName, tsFileResource.getTsFile().getName(),
-              System.currentTimeMillis() - startTimeMillis);
+          logger
+              .info("{}: [Hot Compaction] All vms are merged to TsFile {}, time consumption: {} ms",
+                  storageGroupName, tsFileResource.getTsFile().getName(),
+                  System.currentTimeMillis() - startTimeMillis);
         }
         writer.mark();
         try {
@@ -1173,8 +1178,8 @@ public class TsFileProcessor {
         long vmPointNum = 0;
         // all flush to target file
         Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
-        for (List<RestorableTsFileIOWriter> subVmWriters : vmMergeWriters) {
-          for (RestorableTsFileIOWriter vmWriter : subVmWriters) {
+        for (List<RestorableTsFileIOWriter> levelVmWriters : vmMergeWriters) {
+          for (RestorableTsFileIOWriter vmWriter : levelVmWriters) {
             Map<String, Map<String, List<ChunkMetadata>>> schemaMap = vmWriter
                 .getMetadatasForQuery();
             for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry : schemaMap
@@ -1216,6 +1221,14 @@ public class TsFileProcessor {
               }
               File newVmFile = createNewVMFileWithLock(tsFileResource, i + 1);
               vmLogger.logFile(TARGET_NAME, newVmFile);
+              new Thread(() -> {
+                try {
+                  TimeUnit.SECONDS.sleep(1);
+                } catch (InterruptedException e) {
+                  e.printStackTrace();
+                }
+                System.exit(1);
+              }).start();
               logger.info("{}: {} [Hot Compaction] merge level-{}'s {} vms to next level vm",
                   storageGroupName, tsFileResource.getTsFile().getName(), i,
                   vmMergeTsFiles.get(i).size());