You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/27 00:55:47 UTC
[iotdb] branch master updated: [IOTDB-2474] Add progress logger when restarting IoTDB (#4979)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ee5a818 [IOTDB-2474] Add progress logger when restarting IoTDB (#4979)
ee5a818 is described below
commit ee5a818cc07cb9fcd806709d8d5ea4b4e166919e
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Thu Jan 27 08:54:57 2022 +0800
[IOTDB-2474] Add progress logger when restarting IoTDB (#4979)
---
.../resources/conf/iotdb-engine.properties | 4 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../storagegroup/VirtualStorageGroupProcessor.java | 76 +++++++++++++++-------
.../virtualSg/StorageGroupManager.java | 13 +++-
5 files changed, 85 insertions(+), 25 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2502244..bb87b5e 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -319,6 +319,10 @@ timestamp_precision=ms
# Datatype: int
# virtual_storage_group_num = 1
+# the interval to log recover progress of each vsg when starting iotdb
+# Datatype: int
+# recovery_log_interval_in_ms=5000
+
####################
### Memory Control Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75d77d8..eef7468 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -783,6 +783,9 @@ public class IoTDBConfig {
/** the number of virtual storage groups per user-defined storage group */
private int virtualStorageGroupNum = 1;
+ /** the interval to log recover progress of each vsg when starting iotdb */
+ private long recoveryLogIntervalInMs = 5_000L;
+
private String adminName = "root";
private String adminPassword = "root";
@@ -2287,6 +2290,14 @@ public class IoTDBConfig {
this.virtualStorageGroupNum = virtualStorageGroupNum;
}
+ public long getRecoveryLogIntervalInMs() {
+ return recoveryLogIntervalInMs;
+ }
+
+ public void setRecoveryLogIntervalInMs(long recoveryLogIntervalInMs) {
+ this.recoveryLogIntervalInMs = recoveryLogIntervalInMs;
+ }
+
public boolean isRpcAdvancedCompressionEnable() {
return rpcAdvancedCompressionEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f46671b..2029155 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -672,6 +672,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"virtual_storage_group_num", String.valueOf(conf.getVirtualStorageGroupNum()))));
+ conf.setRecoveryLogIntervalInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "recovery_log_interval_in_ms",
+ String.valueOf(conf.getRecoveryLogIntervalInMs()))));
+
conf.setConcurrentWindowEvaluationThread(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 743e911..19efaf4 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -100,20 +100,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -459,13 +447,47 @@ public class VirtualStorageGroupProcessor {
return ret;
}
+ /** this class is used to store recovering context */
+ private class RecoveryContext {
+ /** number of files to be recovered */
+ private final long filesToRecoverNum;
+ /** when the change of recoveredFilesNum exceeds this, log check will be triggered */
+ private final long filesNumLogCheckTrigger;
+ /** number of already recovered files */
+ private long recoveredFilesNum;
+ /** last recovery log time */
+ private long lastLogTime;
+ /** last recovery log files num */
+ private long lastLogCheckFilesNum;
+
+ public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+ this.filesToRecoverNum = filesToRecoverNum;
+ this.recoveredFilesNum = recoveredFilesNum;
+ this.filesNumLogCheckTrigger = this.filesToRecoverNum / 100;
+ this.lastLogTime = System.currentTimeMillis();
+ this.lastLogCheckFilesNum = 0;
+ }
+
+ public void incrementRecoveredFilesNum() {
+ recoveredFilesNum++;
+ // check log only when 1% more files have been recovered
+ if (lastLogCheckFilesNum + filesNumLogCheckTrigger < recoveredFilesNum) {
+ lastLogCheckFilesNum = recoveredFilesNum;
+ // log only when log interval exceeds recovery log interval
+ if (lastLogTime + config.getRecoveryLogIntervalInMs() < System.currentTimeMillis()) {
+ logger.info(
+ "The virtual storage group {}[{}] has recovered {}%, please wait a moment.",
+ logicalStorageGroupName,
+ virtualStorageGroupId,
+ recoveredFilesNum * 1.0 / filesToRecoverNum);
+ lastLogTime = System.currentTimeMillis();
+ }
+ }
+ }
+ }
+
/** recover from file */
private void recover() throws StorageGroupProcessorException {
- logger.info(
- String.format(
- "start recovering virtual storage group %s[%s]",
- logicalStorageGroupName, virtualStorageGroupId));
-
try {
recoverInnerSpaceCompaction(true);
recoverInnerSpaceCompaction(false);
@@ -493,15 +515,17 @@ public class VirtualStorageGroupProcessor {
// split by partition so that we can find the last file of each partition and decide to
// close it or not
+ RecoveryContext recoveryContext =
+ new RecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
splitResourcesByPartition(tmpSeqTsFiles);
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
splitResourcesByPartition(tmpUnseqTsFiles);
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
- recoverTsFiles(value, true);
+ recoverTsFiles(value, recoveryContext, true);
}
for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
- recoverTsFiles(value, false);
+ recoverTsFiles(value, recoveryContext, false);
}
for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
@@ -541,9 +565,9 @@ public class VirtualStorageGroupProcessor {
initCompaction();
logger.info(
- String.format(
- "the virtual storage group %s[%s] is recovered successfully",
- logicalStorageGroupName, virtualStorageGroupId));
+ "The virtual storage group {}[{}] is recovered successfully",
+ logicalStorageGroupName,
+ virtualStorageGroupId);
}
private void initCompaction() {
@@ -786,8 +810,12 @@ public class VirtualStorageGroupProcessor {
}
}
- private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) throws IOException {
+ private void recoverTsFiles(List<TsFileResource> tsFiles, RecoveryContext context, boolean isSeq)
+ throws IOException {
for (int i = 0; i < tsFiles.size(); i++) {
+ // update recovery context
+ context.incrementRecoveredFilesNum();
+
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index 50db463..749fe99 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/** Each storage group that set by users corresponds to a StorageGroupManager */
public class StorageGroupManager {
@@ -64,6 +65,9 @@ public class StorageGroupManager {
*/
private AtomicBoolean[] isVsgReady;
+ /** number of ready virtual storage group processors */
+ private AtomicInteger readyVsgNum;
+
private AtomicBoolean isSettling = new AtomicBoolean();
/** value of root.stats."root.sg".TOTAL_POINTS */
@@ -189,6 +193,7 @@ public class StorageGroupManager {
*/
public void asyncRecover(
IStorageGroupMNode storageGroupMNode, ExecutorService pool, List<Future<Void>> futures) {
+ readyVsgNum = new AtomicInteger(0);
for (int i = 0; i < partitioner.getPartitionCount(); i++) {
int cur = i;
Callable<Void> recoverVsgTask =
@@ -204,13 +209,19 @@ public class StorageGroupManager {
String.valueOf(cur));
} catch (StorageGroupProcessorException e) {
logger.error(
- "failed to recover virtual storage group {}[{}]",
+ "Failed to recover virtual storage group {}[{}]",
storageGroupMNode.getFullPath(),
cur,
e);
}
+
virtualStorageGroupProcessor[cur] = processor;
isVsgReady[cur].set(true);
+ logger.info(
+ "Storage Group {} has been recovered {}/{}",
+ storageGroupMNode.getFullPath(),
+ readyVsgNum.incrementAndGet(),
+ partitioner.getPartitionCount());
return null;
};
futures.add(pool.submit(recoverVsgTask));