You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/02/13 08:05:11 UTC

[iotdb] branch master updated: [IOTDB-5506] Refactor RatisConsensus(#9029)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a1eb28efa [IOTDB-5506] Refactor RatisConsensus(#9029)
8a1eb28efa is described below

commit 8a1eb28efa43f54da8f6a2105a19be7e7ca4f4da
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Mon Feb 13 16:05:03 2023 +0800

    [IOTDB-5506] Refactor RatisConsensus(#9029)
---
 .../org/apache/iotdb/consensus/common/Utils.java   | 23 +++++++++++-----------
 .../ratis/ApplicationStateMachineProxy.java        |  9 +++------
 .../apache/iotdb/consensus/ratis/RatisClient.java  | 16 +++++++--------
 .../iotdb/consensus/ratis/RatisConsensus.java      | 19 ++++++++----------
 .../iotdb/consensus/ratis/RequestMessage.java      |  6 +++---
 .../iotdb/consensus/ratis/ResponseMessage.java     |  6 +++---
 .../iotdb/consensus/ratis/SnapshotStorage.java     |  9 ++++-----
 7 files changed, 40 insertions(+), 48 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index ddd17c6471..5803157ea5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -31,7 +31,6 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class Utils {
   private static final Logger logger = LoggerFactory.getLogger(Utils.class);
@@ -90,18 +89,18 @@ public class Utils {
     public synchronized long getTotalFolderSize() {
       final List<Path> latest = listAllRegularFilesRecursively(rootDir);
 
-      final List<Path> incremental =
-          latest.stream().filter(p -> !memorized.contains(p)).collect(Collectors.toList());
+      final long incremental =
+          latest.stream()
+              .filter(p -> !memorized.contains(p))
+              .mapToLong(p -> p.toFile().length())
+              .sum();
+      final long decremental =
+          memorized.stream()
+              .filter(p -> !latest.contains(p))
+              .mapToLong(p -> p.toFile().length())
+              .sum();
 
-      final List<Path> decremental =
-          memorized.stream().filter(p -> !latest.contains(p)).collect(Collectors.toList());
-
-      totalSize += incremental.stream().mapToLong(p -> p.toFile().length()).sum();
-      if (decremental.size() == memorized.size()) {
-        totalSize = 0;
-      } else {
-        totalSize -= decremental.stream().mapToLong(p -> p.toFile().length()).sum();
-      }
+      totalSize = totalSize + incremental - decremental;
 
       memorized = latest;
       return totalSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 10acca5386..2fd055baa7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -53,9 +53,6 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
   private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
   private final IStateMachine applicationStateMachine;
   private final IStateMachine.RetryPolicy retryPolicy;
-
-  // Raft Storage sub dir for statemachine data, default (_sm)
-  private File statemachineDir;
   private final SnapshotStorage snapshotStorage;
   private final RaftGroupId groupId;
 
@@ -77,7 +74,6 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
         .startAndTransition(
             () -> {
               snapshotStorage.init(storage);
-              this.statemachineDir = snapshotStorage.getStateMachineDir();
               loadSnapshot(snapshotStorage.findLatestSnapshotDir());
             });
   }
@@ -110,7 +106,7 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
     RaftProtos.LogEntryProto log = trx.getLogEntry();
     updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
 
-    IConsensusRequest applicationRequest = null;
+    IConsensusRequest applicationRequest;
 
     // if this server is leader
     // it will first try to obtain applicationRequest from transaction context
@@ -173,7 +169,8 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
       try {
         TimeUnit.SECONDS.sleep(60);
       } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+        logger.warn("{}: interrupted when waiting until system ready: {}", this, e);
+        Thread.currentThread().interrupt();
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 2900256016..9e870338f5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -38,14 +38,14 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
-public class RatisClient {
+class RatisClient {
 
   private final Logger logger = LoggerFactory.getLogger(RatisClient.class);
   private final RaftGroup serveGroup;
   private final RaftClient raftClient;
   private final ClientManager<RaftGroup, RatisClient> clientManager;
 
-  public RatisClient(
+  RatisClient(
       RaftGroup serveGroup,
       RaftClient client,
       ClientManager<RaftGroup, RatisClient> clientManager) {
@@ -54,7 +54,7 @@ public class RatisClient {
     this.clientManager = clientManager;
   }
 
-  public RaftClient getRaftClient() {
+  RaftClient getRaftClient() {
     return raftClient;
   }
 
@@ -66,11 +66,11 @@ public class RatisClient {
     }
   }
 
-  public void returnSelf() {
+  void returnSelf() {
     clientManager.returnClient(serveGroup, this);
   }
 
-  public static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
+  static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
 
     private final RaftProperties raftProperties;
     private final RaftClientRpc clientRpc;
@@ -124,9 +124,9 @@ public class RatisClient {
   private static class RatisRetryPolicy implements RetryPolicy {
 
     private static final Logger logger = LoggerFactory.getLogger(RatisClient.class);
-    RetryPolicy defaultPolicy;
+    private final RetryPolicy defaultPolicy;
 
-    public RatisRetryPolicy(RatisConfig.Client config) {
+    RatisRetryPolicy(RatisConfig.Client config) {
       defaultPolicy =
           ExponentialBackoffRetry.newBuilder()
               .setBaseSleepTime(
@@ -144,7 +144,7 @@ public class RatisClient {
 
       if (event.getCause() instanceof IOException && !(event.getCause() instanceof RaftException)) {
         // unexpected. may be caused by statemachine.
-        logger.debug("raft client request failed and caught exception: ", event.getCause());
+        logger.info("raft client request failed and caught exception: ", event.getCause());
         return NO_RETRY_ACTION;
       }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 1375a88cca..1cac98dafb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -88,11 +88,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-/**
- * A multi-raft consensus implementation based on Ratis, currently still under development.
- *
- * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
- */
+/** A multi-raft consensus implementation based on Apache Ratis. */
 class RatisConsensus implements IConsensus {
 
   private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
@@ -686,16 +682,14 @@ class RatisConsensus implements IConsensus {
 
   private void triggerSnapshotByCustomize() {
 
-    Iterable<RaftGroupId> groupIds = server.getGroupIds();
-
-    for (RaftGroupId raftGroupId : groupIds) {
-      File currentDir = null;
+    for (RaftGroupId raftGroupId : server.getGroupIds()) {
+      File currentDir;
 
       try {
         currentDir =
             server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
       } catch (IOException e) {
-        logger.warn("Get division failed: ", e);
+        logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
         continue;
       }
 
@@ -709,7 +703,10 @@ class RatisConsensus implements IConsensus {
         if (consensusGenericResponse.isSuccess()) {
           logger.info("Raft group {} took snapshot successfully", raftGroupId);
         } else {
-          logger.warn("Raft group {} failed to take snapshot", raftGroupId);
+          logger.warn(
+              "Raft group {} failed to take snapshot due to {}",
+              raftGroupId,
+              consensusGenericResponse.getException());
         }
       }
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 2067a54e42..9eed308a91 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -24,17 +24,17 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
-public class RequestMessage implements Message {
+class RequestMessage implements Message {
 
   private final IConsensusRequest actualRequest;
   private volatile ByteString serializedContent;
 
-  public RequestMessage(IConsensusRequest request) {
+  RequestMessage(IConsensusRequest request) {
     this.actualRequest = request;
     serializedContent = null;
   }
 
-  public IConsensusRequest getActualRequest() {
+  IConsensusRequest getActualRequest() {
     return actualRequest;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
index 50903607d9..1eb4b7cb89 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
@@ -26,7 +26,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ResponseMessage implements Message {
+class ResponseMessage implements Message {
 
   /**
    * This content holder may hold 1. TSStatus, which may be serialized when called getContent() 2.
@@ -37,12 +37,12 @@ public class ResponseMessage implements Message {
   private volatile ByteString serializedData;
   private final Logger logger = LoggerFactory.getLogger(ResponseMessage.class);
 
-  public ResponseMessage(Object content) {
+  ResponseMessage(Object content) {
     this.contentHolder = content;
     this.serializedData = null;
   }
 
-  public Object getContentHolder() {
+  Object getContentHolder() {
     return contentHolder;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 0bbd970b84..c7b843f994 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -50,7 +50,6 @@ public class SnapshotStorage implements StateMachineStorage {
   private static final String TMP_PREFIX = ".tmp.";
   private File stateMachineDir;
   private final RaftGroupId groupId;
-
   private final ReentrantReadWriteLock snapshotCacheGuard = new ReentrantReadWriteLock();
   private SnapshotInfo currentSnapshot = null;
 
@@ -176,20 +175,20 @@ public class SnapshotStorage implements StateMachineStorage {
     }
   }
 
-  public File getStateMachineDir() {
+  File getStateMachineDir() {
     return stateMachineDir;
   }
 
-  public File getSnapshotDir(String snapshotMetadata) {
+  File getSnapshotDir(String snapshotMetadata) {
     return new File(getStateMachineDir().getAbsolutePath() + File.separator + snapshotMetadata);
   }
 
-  public File getSnapshotTmpDir(String snapshotMetadata) {
+  File getSnapshotTmpDir(String snapshotMetadata) {
     return new File(
         getStateMachineDir().getAbsolutePath() + File.separator + TMP_PREFIX + snapshotMetadata);
   }
 
-  public String getSnapshotTmpId(String snapshotMetadata) {
+  String getSnapshotTmpId(String snapshotMetadata) {
     return TMP_PREFIX + snapshotMetadata;
   }