You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/02/13 08:05:11 UTC
[iotdb] branch master updated: [IOTDB-5506] Refactor RatisConsensus(#9029)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1eb28efa [IOTDB-5506] Refactor RatisConsensus(#9029)
8a1eb28efa is described below
commit 8a1eb28efa43f54da8f6a2105a19be7e7ca4f4da
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Mon Feb 13 16:05:03 2023 +0800
[IOTDB-5506] Refactor RatisConsensus(#9029)
---
.../org/apache/iotdb/consensus/common/Utils.java | 23 +++++++++++-----------
.../ratis/ApplicationStateMachineProxy.java | 9 +++------
.../apache/iotdb/consensus/ratis/RatisClient.java | 16 +++++++--------
.../iotdb/consensus/ratis/RatisConsensus.java | 19 ++++++++----------
.../iotdb/consensus/ratis/RequestMessage.java | 6 +++---
.../iotdb/consensus/ratis/ResponseMessage.java | 6 +++---
.../iotdb/consensus/ratis/SnapshotStorage.java | 9 ++++-----
7 files changed, 40 insertions(+), 48 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
index ddd17c6471..5803157ea5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Utils.java
@@ -31,7 +31,6 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
public class Utils {
private static final Logger logger = LoggerFactory.getLogger(Utils.class);
@@ -90,18 +89,18 @@ public class Utils {
public synchronized long getTotalFolderSize() {
final List<Path> latest = listAllRegularFilesRecursively(rootDir);
- final List<Path> incremental =
- latest.stream().filter(p -> !memorized.contains(p)).collect(Collectors.toList());
+ final long incremental =
+ latest.stream()
+ .filter(p -> !memorized.contains(p))
+ .mapToLong(p -> p.toFile().length())
+ .sum();
+ final long decremental =
+ memorized.stream()
+ .filter(p -> !latest.contains(p))
+ .mapToLong(p -> p.toFile().length())
+ .sum();
- final List<Path> decremental =
- memorized.stream().filter(p -> !latest.contains(p)).collect(Collectors.toList());
-
- totalSize += incremental.stream().mapToLong(p -> p.toFile().length()).sum();
- if (decremental.size() == memorized.size()) {
- totalSize = 0;
- } else {
- totalSize -= decremental.stream().mapToLong(p -> p.toFile().length()).sum();
- }
+ totalSize = totalSize + incremental - decremental;
memorized = latest;
return totalSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 10acca5386..2fd055baa7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -53,9 +53,6 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
private final IStateMachine applicationStateMachine;
private final IStateMachine.RetryPolicy retryPolicy;
-
- // Raft Storage sub dir for statemachine data, default (_sm)
- private File statemachineDir;
private final SnapshotStorage snapshotStorage;
private final RaftGroupId groupId;
@@ -77,7 +74,6 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
.startAndTransition(
() -> {
snapshotStorage.init(storage);
- this.statemachineDir = snapshotStorage.getStateMachineDir();
loadSnapshot(snapshotStorage.findLatestSnapshotDir());
});
}
@@ -110,7 +106,7 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
RaftProtos.LogEntryProto log = trx.getLogEntry();
updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
- IConsensusRequest applicationRequest = null;
+ IConsensusRequest applicationRequest;
// if this server is leader
// it will first try to obtain applicationRequest from transaction context
@@ -173,7 +169,8 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ logger.warn("{}: interrupted when waiting until system ready: {}", this, e);
+ Thread.currentThread().interrupt();
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 2900256016..9e870338f5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -38,14 +38,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
-public class RatisClient {
+class RatisClient {
private final Logger logger = LoggerFactory.getLogger(RatisClient.class);
private final RaftGroup serveGroup;
private final RaftClient raftClient;
private final ClientManager<RaftGroup, RatisClient> clientManager;
- public RatisClient(
+ RatisClient(
RaftGroup serveGroup,
RaftClient client,
ClientManager<RaftGroup, RatisClient> clientManager) {
@@ -54,7 +54,7 @@ public class RatisClient {
this.clientManager = clientManager;
}
- public RaftClient getRaftClient() {
+ RaftClient getRaftClient() {
return raftClient;
}
@@ -66,11 +66,11 @@ public class RatisClient {
}
}
- public void returnSelf() {
+ void returnSelf() {
clientManager.returnClient(serveGroup, this);
}
- public static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
+ static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
private final RaftProperties raftProperties;
private final RaftClientRpc clientRpc;
@@ -124,9 +124,9 @@ public class RatisClient {
private static class RatisRetryPolicy implements RetryPolicy {
private static final Logger logger = LoggerFactory.getLogger(RatisClient.class);
- RetryPolicy defaultPolicy;
+ private final RetryPolicy defaultPolicy;
- public RatisRetryPolicy(RatisConfig.Client config) {
+ RatisRetryPolicy(RatisConfig.Client config) {
defaultPolicy =
ExponentialBackoffRetry.newBuilder()
.setBaseSleepTime(
@@ -144,7 +144,7 @@ public class RatisClient {
if (event.getCause() instanceof IOException && !(event.getCause() instanceof RaftException)) {
// unexpected. may be caused by statemachine.
- logger.debug("raft client request failed and caught exception: ", event.getCause());
+ logger.info("raft client request failed and caught exception: ", event.getCause());
return NO_RETRY_ACTION;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 1375a88cca..1cac98dafb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -88,11 +88,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-/**
- * A multi-raft consensus implementation based on Ratis, currently still under development.
- *
- * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
- */
+/** A multi-raft consensus implementation based on Apache Ratis. */
class RatisConsensus implements IConsensus {
private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
@@ -686,16 +682,14 @@ class RatisConsensus implements IConsensus {
private void triggerSnapshotByCustomize() {
- Iterable<RaftGroupId> groupIds = server.getGroupIds();
-
- for (RaftGroupId raftGroupId : groupIds) {
- File currentDir = null;
+ for (RaftGroupId raftGroupId : server.getGroupIds()) {
+ File currentDir;
try {
currentDir =
server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
} catch (IOException e) {
- logger.warn("Get division failed: ", e);
+ logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
continue;
}
@@ -709,7 +703,10 @@ class RatisConsensus implements IConsensus {
if (consensusGenericResponse.isSuccess()) {
logger.info("Raft group {} took snapshot successfully", raftGroupId);
} else {
- logger.warn("Raft group {} failed to take snapshot", raftGroupId);
+ logger.warn(
+ "Raft group {} failed to take snapshot due to {}",
+ raftGroupId,
+ consensusGenericResponse.getException());
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 2067a54e42..9eed308a91 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -24,17 +24,17 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
-public class RequestMessage implements Message {
+class RequestMessage implements Message {
private final IConsensusRequest actualRequest;
private volatile ByteString serializedContent;
- public RequestMessage(IConsensusRequest request) {
+ RequestMessage(IConsensusRequest request) {
this.actualRequest = request;
serializedContent = null;
}
- public IConsensusRequest getActualRequest() {
+ IConsensusRequest getActualRequest() {
return actualRequest;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
index 50903607d9..1eb4b7cb89 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java
@@ -26,7 +26,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ResponseMessage implements Message {
+class ResponseMessage implements Message {
/**
* This content holder may hold 1. TSStatus, which may be serialized when called getContent() 2.
@@ -37,12 +37,12 @@ public class ResponseMessage implements Message {
private volatile ByteString serializedData;
private final Logger logger = LoggerFactory.getLogger(ResponseMessage.class);
- public ResponseMessage(Object content) {
+ ResponseMessage(Object content) {
this.contentHolder = content;
this.serializedData = null;
}
- public Object getContentHolder() {
+ Object getContentHolder() {
return contentHolder;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 0bbd970b84..c7b843f994 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -50,7 +50,6 @@ public class SnapshotStorage implements StateMachineStorage {
private static final String TMP_PREFIX = ".tmp.";
private File stateMachineDir;
private final RaftGroupId groupId;
-
private final ReentrantReadWriteLock snapshotCacheGuard = new ReentrantReadWriteLock();
private SnapshotInfo currentSnapshot = null;
@@ -176,20 +175,20 @@ public class SnapshotStorage implements StateMachineStorage {
}
}
- public File getStateMachineDir() {
+ File getStateMachineDir() {
return stateMachineDir;
}
- public File getSnapshotDir(String snapshotMetadata) {
+ File getSnapshotDir(String snapshotMetadata) {
return new File(getStateMachineDir().getAbsolutePath() + File.separator + snapshotMetadata);
}
- public File getSnapshotTmpDir(String snapshotMetadata) {
+ File getSnapshotTmpDir(String snapshotMetadata) {
return new File(
getStateMachineDir().getAbsolutePath() + File.separator + TMP_PREFIX + snapshotMetadata);
}
- public String getSnapshotTmpId(String snapshotMetadata) {
+ String getSnapshotTmpId(String snapshotMetadata) {
return TMP_PREFIX + snapshotMetadata;
}