You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/04/07 15:38:59 UTC
[iotdb] branch rel/1.1 updated: [IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 893b79bf00 [IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)
893b79bf00 is described below
commit 893b79bf001c51f91c93925443b947890fd43119
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Fri Apr 7 23:38:51 2023 +0800
[IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)
---
.../org/apache/iotdb/consensus/common/Utils.java | 32 --------
.../iot/logdispatcher/IndexController.java | 2 +-
.../ratis/ApplicationStateMachineProxy.java | 1 +
.../iotdb/consensus/ratis/RatisConsensus.java | 35 +++++----
.../iotdb/consensus/ratis/ResponseMessage.java | 1 +
.../iotdb/consensus/ratis/SnapshotStorage.java | 1 +
.../ratis/metrics/IoTDBMetricRegistry.java | 2 +-
.../consensus/ratis/utils/RatisLogMonitor.java | 87 ++++++++++++++++++++++
.../iotdb/consensus/ratis/{ => utils}/Utils.java | 2 +-
.../iot/logdispatcher/IndexControllerTest.java | 2 +-
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 1 +
.../apache/iotdb/consensus/ratis/UtilsTest.java | 1 +
12 files changed, 118 insertions(+), 49 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index 5803157ea5..96d381949e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -74,36 +74,4 @@ public class Utils {
}
return allFiles;
}
-
- public static class MemorizedFileSizeCalc {
- private List<Path> memorized;
- private final File rootDir;
- private long totalSize;
-
- public MemorizedFileSizeCalc(File rootDir) {
- this.memorized = Collections.emptyList();
- this.rootDir = rootDir;
- this.totalSize = 0;
- }
-
- public synchronized long getTotalFolderSize() {
- final List<Path> latest = listAllRegularFilesRecursively(rootDir);
-
- final long incremental =
- latest.stream()
- .filter(p -> !memorized.contains(p))
- .mapToLong(p -> p.toFile().length())
- .sum();
- final long decremental =
- memorized.stream()
- .filter(p -> !latest.contains(p))
- .mapToLong(p -> p.toFile().length())
- .sum();
-
- totalSize = totalSize + incremental - decremental;
-
- memorized = latest;
- return totalSize;
- }
- }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 56bc35c84b..d153ae7460 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 44516072c2..06d955a573 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.ratis.proto.RaftProtos;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 7cac28eb17..5d2b362acd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.Utils.MemorizedFileSizeCalc;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -52,6 +51,8 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -95,7 +96,7 @@ import java.util.stream.Collectors;
/** A multi-raft consensus implementation based on Apache Ratis. */
class RatisConsensus implements IConsensus {
- private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
+ private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
/** the unique net communication endpoint */
private final RaftPeer myself;
@@ -120,10 +121,11 @@ class RatisConsensus implements IConsensus {
private final ExecutorService addExecutor;
private final ScheduledExecutorService diskGuardian;
+ private final long triggerSnapshotThreshold;
private final RatisConfig config;
- private final ConcurrentHashMap<File, MemorizedFileSizeCalc> calcMap = new ConcurrentHashMap<>();
+ private final RatisLogMonitor monitor = new RatisLogMonitor();
private final RatisMetricSet ratisMetricSet;
private TConsensusGroupType consensusGroupType = null;
@@ -138,14 +140,15 @@ class RatisConsensus implements IConsensus {
properties, Collections.singletonList(new File(config.getStorageDir())));
GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());
- addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool("ratis-add");
- diskGuardian =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("ratis-bg-disk-guardian");
-
Utils.initRatisConfig(properties, config.getRatisConfig());
this.config = config.getRatisConfig();
this.ratisMetricSet = new RatisMetricSet();
+ this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
+ addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool("ratis-add");
+ diskGuardian =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("ratis-bg-disk-guardian");
+
clientManager =
new IClientManager.Factory<RaftGroup, RatisClient>()
.createClientManager(new RatisClientPoolFactory());
@@ -723,18 +726,24 @@ class RatisConsensus implements IConsensus {
continue;
}
- final long currentDirLength =
- calcMap.computeIfAbsent(currentDir, MemorizedFileSizeCalc::new).getTotalFolderSize();
- final long triggerSnapshotFileSize = config.getImpl().getTriggerSnapshotFileSize();
+ final long currentDirLength = monitor.updateAndGetDirectorySize(currentDir);
+
+ if (currentDirLength >= triggerSnapshotThreshold) {
+ final int filesCount = monitor.getFilesUnder(currentDir).size();
+ logger.info(
+ "{}: take snapshot for region {}, current dir size {}, {} files to be purged",
+ this,
+ raftGroupId,
+ currentDirLength,
+ filesCount);
- if (currentDirLength >= triggerSnapshotFileSize) {
- ConsensusGenericResponse consensusGenericResponse =
+ final ConsensusGenericResponse consensusGenericResponse =
triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
if (consensusGenericResponse.isSuccess()) {
logger.info("Raft group {} took snapshot successfully", raftGroupId);
} else {
logger.warn(
- "Raft group {} failed to take snapshot due to {}",
+ "Raft group {} failed to take snapshot due to",
raftGroupId,
consensusGenericResponse.getException());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
index 1eb4b7cb89..7a2e30dae8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index d93663b27a..9f9d281110 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.ratis.protocol.RaftGroupId;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
index 065809b0ea..3f196a467a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.consensus.ratis.metrics;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
new file mode 100644
index 0000000000..2482199928
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Monitoring Ratis RaftLog total size. It will memorize the state of all files in the last update
+ * and calculates the diff incrementally each run.
+ */
+public class RatisLogMonitor {
+ private static final Logger logger = LoggerFactory.getLogger(RatisLogMonitor.class);
+
+ /* whether the path denotes an open segment under active writing progress */
+ private static final Predicate<Path> isOpenSegment =
+ p -> p.toFile().getName().startsWith("log_inprogress");
+
+ private static final class DirectoryState {
+ private long size = 0;
+ private Set<Path> memorizedFiles = Collections.emptySet();
+
+ private void update(long size, Set<Path> latest) {
+ this.size = size;
+ this.memorizedFiles = latest;
+ }
+ }
+
+ private final HashMap<File, DirectoryState> directoryMap = new HashMap<>();
+
+ public long updateAndGetDirectorySize(File dir) {
+ final DirectoryState state = directoryMap.computeIfAbsent(dir, d -> new DirectoryState());
+ Set<Path> latest;
+ try (final Stream<Path> files = Files.list(dir.toPath())) {
+ latest = files.filter(isOpenSegment).collect(Collectors.toSet());
+ } catch (IOException e) {
+ logger.warn("{}: Error caught when listing files under {}:", this, dir, e);
+ // keep the files unchanged and return the size calculated last time
+ return state.size;
+ }
+ final long sizeDiff = diff(state.memorizedFiles, latest);
+ final long newSize = state.size + sizeDiff;
+ state.update(newSize, latest);
+ return newSize;
+ }
+
+ public Set<Path> getFilesUnder(File dir) {
+ return Collections.unmodifiableSet(directoryMap.get(dir).memorizedFiles);
+ }
+
+ private static long diff(Set<Path> old, Set<Path> latest) {
+ final long incremental = totalSize(latest.stream().filter(p -> !old.contains(p)));
+ final long decremental = totalSize(old.stream().filter(p -> !latest.contains(p)));
+ return incremental - decremental;
+ }
+
+ private static long totalSize(Stream<Path> files) {
+ return files.mapToLong(p -> p.toFile().length()).sum();
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
similarity index 99%
rename from consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index c40f1b511b..0a76426400 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.consensus.ratis;
+package org.apache.iotdb.consensus.ratis.utils;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index fff348516e..a34c6ca022 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index b2c3fae3ae..1a62e3c8b7 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServerConfigKeys;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
index ef2287f3a5..f536653a3a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.ratis.protocol.RaftGroupId;
import org.junit.Assert;