You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/27 00:55:47 UTC

[iotdb] branch master updated: [IOTDB-2474] Add progress logger when restarting IoTDB (#4979)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5a818  [IOTDB-2474] Add progress logger when restarting IoTDB (#4979)
ee5a818 is described below

commit ee5a818cc07cb9fcd806709d8d5ea4b4e166919e
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Thu Jan 27 08:54:57 2022 +0800

    [IOTDB-2474] Add progress logger when restarting IoTDB (#4979)
---
 .../resources/conf/iotdb-engine.properties         |  4 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../storagegroup/VirtualStorageGroupProcessor.java | 76 +++++++++++++++-------
 .../virtualSg/StorageGroupManager.java             | 13 +++-
 5 files changed, 85 insertions(+), 25 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2502244..bb87b5e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -319,6 +319,10 @@ timestamp_precision=ms
 # Datatype: int
 # virtual_storage_group_num = 1
 
+# the interval to log recover progress of each vsg when starting iotdb
+# Datatype: int
+# recovery_log_interval_in_ms=5000
+
 ####################
 ### Memory Control Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75d77d8..eef7468 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -783,6 +783,9 @@ public class IoTDBConfig {
   /** the number of virtual storage groups per user-defined storage group */
   private int virtualStorageGroupNum = 1;
 
+  /** the interval to log recover progress of each vsg when starting iotdb */
+  private long recoveryLogIntervalInMs = 5_000L;
+
   private String adminName = "root";
 
   private String adminPassword = "root";
@@ -2287,6 +2290,14 @@ public class IoTDBConfig {
     this.virtualStorageGroupNum = virtualStorageGroupNum;
   }
 
+  public long getRecoveryLogIntervalInMs() {
+    return recoveryLogIntervalInMs;
+  }
+
+  public void setRecoveryLogIntervalInMs(long recoveryLogIntervalInMs) {
+    this.recoveryLogIntervalInMs = recoveryLogIntervalInMs;
+  }
+
   public boolean isRpcAdvancedCompressionEnable() {
     return rpcAdvancedCompressionEnable;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f46671b..2029155 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -672,6 +672,12 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "virtual_storage_group_num", String.valueOf(conf.getVirtualStorageGroupNum()))));
 
+      conf.setRecoveryLogIntervalInMs(
+          Long.parseLong(
+              properties.getProperty(
+                  "recovery_log_interval_in_ms",
+                  String.valueOf(conf.getRecoveryLogIntervalInMs()))));
+
       conf.setConcurrentWindowEvaluationThread(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 743e911..19efaf4 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -100,20 +100,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -459,13 +447,47 @@ public class VirtualStorageGroupProcessor {
     return ret;
   }
 
+  /** this class is used to store recovering context */
+  private class RecoveryContext {
+    /** number of files to be recovered */
+    private final long filesToRecoverNum;
+    /** when the change of recoveredFilesNum exceeds this, log check will be triggered */
+    private final long filesNumLogCheckTrigger;
+    /** number of already recovered files */
+    private long recoveredFilesNum;
+    /** last recovery log time */
+    private long lastLogTime;
+    /** last recovery log files num */
+    private long lastLogCheckFilesNum;
+
+    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+      this.filesToRecoverNum = filesToRecoverNum;
+      this.recoveredFilesNum = recoveredFilesNum;
+      this.filesNumLogCheckTrigger = this.filesToRecoverNum / 100;
+      this.lastLogTime = System.currentTimeMillis();
+      this.lastLogCheckFilesNum = 0;
+    }
+
+    public void incrementRecoveredFilesNum() {
+      recoveredFilesNum++;
+      // check log only when 1% more files have been recovered
+      if (lastLogCheckFilesNum + filesNumLogCheckTrigger < recoveredFilesNum) {
+        lastLogCheckFilesNum = recoveredFilesNum;
+        // log only when log interval exceeds recovery log interval
+        if (lastLogTime + config.getRecoveryLogIntervalInMs() < System.currentTimeMillis()) {
+          logger.info(
+              "The virtual storage group {}[{}] has recovered {}%, please wait a moment.",
+              logicalStorageGroupName,
+              virtualStorageGroupId,
+              recoveredFilesNum * 1.0 / filesToRecoverNum);
+          lastLogTime = System.currentTimeMillis();
+        }
+      }
+    }
+  }
+
   /** recover from file */
   private void recover() throws StorageGroupProcessorException {
-    logger.info(
-        String.format(
-            "start recovering virtual storage group %s[%s]",
-            logicalStorageGroupName, virtualStorageGroupId));
-
     try {
       recoverInnerSpaceCompaction(true);
       recoverInnerSpaceCompaction(false);
@@ -493,15 +515,17 @@ public class VirtualStorageGroupProcessor {
 
       // split by partition so that we can find the last file of each partition and decide to
       // close it or not
+      RecoveryContext recoveryContext =
+          new RecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
       Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
           splitResourcesByPartition(tmpSeqTsFiles);
       Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
           splitResourcesByPartition(tmpUnseqTsFiles);
       for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
-        recoverTsFiles(value, true);
+        recoverTsFiles(value, recoveryContext, true);
       }
       for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
-        recoverTsFiles(value, false);
+        recoverTsFiles(value, recoveryContext, false);
       }
       for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
         long partitionNum = resource.getTimePartition();
@@ -541,9 +565,9 @@ public class VirtualStorageGroupProcessor {
     initCompaction();
 
     logger.info(
-        String.format(
-            "the virtual storage group %s[%s] is recovered successfully",
-            logicalStorageGroupName, virtualStorageGroupId));
+        "The virtual storage group {}[{}] is recovered successfully",
+        logicalStorageGroupName,
+        virtualStorageGroupId);
   }
 
   private void initCompaction() {
@@ -786,8 +810,12 @@ public class VirtualStorageGroupProcessor {
     }
   }
 
-  private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) throws IOException {
+  private void recoverTsFiles(List<TsFileResource> tsFiles, RecoveryContext context, boolean isSeq)
+      throws IOException {
     for (int i = 0; i < tsFiles.size(); i++) {
+      // update recovery context
+      context.incrementRecoveredFilesNum();
+
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index 50db463..749fe99 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Each storage group that set by users corresponds to a StorageGroupManager */
 public class StorageGroupManager {
@@ -64,6 +65,9 @@ public class StorageGroupManager {
    */
   private AtomicBoolean[] isVsgReady;
 
+  /** number of ready virtual storage group processors */
+  private AtomicInteger readyVsgNum;
+
   private AtomicBoolean isSettling = new AtomicBoolean();
 
   /** value of root.stats."root.sg".TOTAL_POINTS */
@@ -189,6 +193,7 @@ public class StorageGroupManager {
    */
   public void asyncRecover(
       IStorageGroupMNode storageGroupMNode, ExecutorService pool, List<Future<Void>> futures) {
+    readyVsgNum = new AtomicInteger(0);
     for (int i = 0; i < partitioner.getPartitionCount(); i++) {
       int cur = i;
       Callable<Void> recoverVsgTask =
@@ -204,13 +209,19 @@ public class StorageGroupManager {
                           String.valueOf(cur));
             } catch (StorageGroupProcessorException e) {
               logger.error(
-                  "failed to recover virtual storage group {}[{}]",
+                  "Failed to recover virtual storage group {}[{}]",
                   storageGroupMNode.getFullPath(),
                   cur,
                   e);
             }
+
             virtualStorageGroupProcessor[cur] = processor;
             isVsgReady[cur].set(true);
+            logger.info(
+                "Storage Group {} has been recovered {}/{}",
+                storageGroupMNode.getFullPath(),
+                readyVsgNum.incrementAndGet(),
+                partitioner.getPartitionCount());
             return null;
           };
       futures.add(pool.submit(recoverVsgTask));