You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/04/07 15:38:59 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 893b79bf00 [IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)
893b79bf00 is described below

commit 893b79bf001c51f91c93925443b947890fd43119
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Fri Apr 7 23:38:51 2023 +0800

    [IOTDB-5466] Refactor RaftLog disk monitor to avoid unnecessary snapshots (#9515) (#9554)
---
 .../org/apache/iotdb/consensus/common/Utils.java   | 32 --------
 .../iot/logdispatcher/IndexController.java         |  2 +-
 .../ratis/ApplicationStateMachineProxy.java        |  1 +
 .../iotdb/consensus/ratis/RatisConsensus.java      | 35 +++++----
 .../iotdb/consensus/ratis/ResponseMessage.java     |  1 +
 .../iotdb/consensus/ratis/SnapshotStorage.java     |  1 +
 .../ratis/metrics/IoTDBMetricRegistry.java         |  2 +-
 .../consensus/ratis/utils/RatisLogMonitor.java     | 87 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/{ => utils}/Utils.java   |  2 +-
 .../iot/logdispatcher/IndexControllerTest.java     |  2 +-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java |  1 +
 .../apache/iotdb/consensus/ratis/UtilsTest.java    |  1 +
 12 files changed, 118 insertions(+), 49 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index 5803157ea5..96d381949e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -74,36 +74,4 @@ public class Utils {
     }
     return allFiles;
   }
-
-  public static class MemorizedFileSizeCalc {
-    private List<Path> memorized;
-    private final File rootDir;
-    private long totalSize;
-
-    public MemorizedFileSizeCalc(File rootDir) {
-      this.memorized = Collections.emptyList();
-      this.rootDir = rootDir;
-      this.totalSize = 0;
-    }
-
-    public synchronized long getTotalFolderSize() {
-      final List<Path> latest = listAllRegularFilesRecursively(rootDir);
-
-      final long incremental =
-          latest.stream()
-              .filter(p -> !memorized.contains(p))
-              .mapToLong(p -> p.toFile().length())
-              .sum();
-      final long decremental =
-          memorized.stream()
-              .filter(p -> !latest.contains(p))
-              .mapToLong(p -> p.toFile().length())
-              .sum();
-
-      totalSize = totalSize + incremental - decremental;
-
-      memorized = latest;
-      return totalSize;
-    }
-  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 56bc35c84b..d153ae7460 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 44516072c2..06d955a573 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.proto.RaftProtos;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 7cac28eb17..5d2b362acd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.Utils.MemorizedFileSizeCalc;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -52,6 +51,8 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -95,7 +96,7 @@ import java.util.stream.Collectors;
 /** A multi-raft consensus implementation based on Apache Ratis. */
 class RatisConsensus implements IConsensus {
 
-  private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
+  private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
 
   /** the unique net communication endpoint */
   private final RaftPeer myself;
@@ -120,10 +121,11 @@ class RatisConsensus implements IConsensus {
 
   private final ExecutorService addExecutor;
   private final ScheduledExecutorService diskGuardian;
+  private final long triggerSnapshotThreshold;
 
   private final RatisConfig config;
 
-  private final ConcurrentHashMap<File, MemorizedFileSizeCalc> calcMap = new ConcurrentHashMap<>();
+  private final RatisLogMonitor monitor = new RatisLogMonitor();
 
   private final RatisMetricSet ratisMetricSet;
   private TConsensusGroupType consensusGroupType = null;
@@ -138,14 +140,15 @@ class RatisConsensus implements IConsensus {
         properties, Collections.singletonList(new File(config.getStorageDir())));
     GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());
 
-    addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool("ratis-add");
-    diskGuardian =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("ratis-bg-disk-guardian");
-
     Utils.initRatisConfig(properties, config.getRatisConfig());
     this.config = config.getRatisConfig();
     this.ratisMetricSet = new RatisMetricSet();
 
+    this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
+    addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool("ratis-add");
+    diskGuardian =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("ratis-bg-disk-guardian");
+
     clientManager =
         new IClientManager.Factory<RaftGroup, RatisClient>()
             .createClientManager(new RatisClientPoolFactory());
@@ -723,18 +726,24 @@ class RatisConsensus implements IConsensus {
         continue;
       }
 
-      final long currentDirLength =
-          calcMap.computeIfAbsent(currentDir, MemorizedFileSizeCalc::new).getTotalFolderSize();
-      final long triggerSnapshotFileSize = config.getImpl().getTriggerSnapshotFileSize();
+      final long currentDirLength = monitor.updateAndGetDirectorySize(currentDir);
+
+      if (currentDirLength >= triggerSnapshotThreshold) {
+        final int filesCount = monitor.getFilesUnder(currentDir).size();
+        logger.info(
+            "{}: take snapshot for region {}, current dir size {}, {} files to be purged",
+            this,
+            raftGroupId,
+            currentDirLength,
+            filesCount);
 
-      if (currentDirLength >= triggerSnapshotFileSize) {
-        ConsensusGenericResponse consensusGenericResponse =
+        final ConsensusGenericResponse consensusGenericResponse =
             triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
         if (consensusGenericResponse.isSuccess()) {
           logger.info("Raft group {} took snapshot successfully", raftGroupId);
         } else {
           logger.warn(
-              "Raft group {} failed to take snapshot due to {}",
+              "Raft group {} failed to take snapshot due to",
               raftGroupId,
               consensusGenericResponse.getException());
         }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
index 1eb4b7cb89..7a2e30dae8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index d93663b27a..9f9d281110 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.ratis.protocol.RaftGroupId;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
index 065809b0ea..3f196a467a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/metrics/IoTDBMetricRegistry.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.consensus.ratis.metrics;
 
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
new file mode 100644
index 0000000000..2482199928
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Monitoring Ratis RaftLog total size. It will memorize the state of all files in the last update
+ * and calculates the diff incrementally each run.
+ */
+public class RatisLogMonitor {
+  private static final Logger logger = LoggerFactory.getLogger(RatisLogMonitor.class);
+
+  /* whether the path denotes an open segment under active writing progress */
+  private static final Predicate<Path> isOpenSegment =
+      p -> p.toFile().getName().startsWith("log_inprogress");
+
+  private static final class DirectoryState {
+    private long size = 0;
+    private Set<Path> memorizedFiles = Collections.emptySet();
+
+    private void update(long size, Set<Path> latest) {
+      this.size = size;
+      this.memorizedFiles = latest;
+    }
+  }
+
+  private final HashMap<File, DirectoryState> directoryMap = new HashMap<>();
+
+  public long updateAndGetDirectorySize(File dir) {
+    final DirectoryState state = directoryMap.computeIfAbsent(dir, d -> new DirectoryState());
+    Set<Path> latest;
+    try (final Stream<Path> files = Files.list(dir.toPath())) {
+      latest = files.filter(isOpenSegment).collect(Collectors.toSet());
+    } catch (IOException e) {
+      logger.warn("{}: Error caught when listing files under {}:", this, dir, e);
+      // keep the files unchanged and return the size calculated last time
+      return state.size;
+    }
+    final long sizeDiff = diff(state.memorizedFiles, latest);
+    final long newSize = state.size + sizeDiff;
+    state.update(newSize, latest);
+    return newSize;
+  }
+
+  public Set<Path> getFilesUnder(File dir) {
+    return Collections.unmodifiableSet(directoryMap.get(dir).memorizedFiles);
+  }
+
+  private static long diff(Set<Path> old, Set<Path> latest) {
+    final long incremental = totalSize(latest.stream().filter(p -> !old.contains(p)));
+    final long decremental = totalSize(old.stream().filter(p -> !latest.contains(p)));
+    return incremental - decremental;
+  }
+
+  private static long totalSize(Stream<Path> files) {
+    return files.mapToLong(p -> p.toFile().length()).sum();
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
similarity index 99%
rename from consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index c40f1b511b..0a76426400 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.consensus.ratis;
+package org.apache.iotdb.consensus.ratis.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index fff348516e..a34c6ca022 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.ratis.Utils;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index b2c3fae3ae..1a62e3c8b7 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServerConfigKeys;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
index ef2287f3a5..f536653a3a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
 
 import org.apache.ratis.protocol.RaftGroupId;
 import org.junit.Assert;