You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/04 11:09:56 UTC
[iotdb] branch expr_vgraft updated: tmp save
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 31cdac29ac tmp save
31cdac29ac is described below
commit 31cdac29ac40a8fbbbf21d06af4550dba10cf73a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Oct 4 19:09:51 2022 +0800
tmp save
---
.../iotdb/cluster/config/ClusterDescriptor.java | 8 +-
.../iotdb/cluster/expr/vgraft/KeyManager.java | 25 +--
.../cluster/expr/vgraft/TrustValueHolder.java | 80 +++++-----
.../iotdb/cluster/log/FragmentedLogDispatcher.java | 3 +-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 15 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 33 ++--
.../apache/iotdb/cluster/log/VotingLogList.java | 67 ++++----
.../cluster/log/appender/BlockingLogAppender.java | 5 +-
.../log/appender/SlidingWindowLogAppender.java | 5 +-
.../iotdb/cluster/log/applier/DataLogApplier.java | 1 -
.../manage/FilePartitionedSnapshotLogManager.java | 9 +-
.../log/manage/MetaSingleSnapshotLogManager.java | 1 -
.../server/handlers/caller/LogCatchUpHandler.java | 5 +-
.../iotdb/cluster/server/member/RaftMember.java | 173 ++++++++-------------
.../cluster/server/service/BaseAsyncService.java | 3 +-
.../cluster/server/service/BaseSyncService.java | 3 +-
.../cluster/server/service/DataAsyncService.java | 4 +-
.../cluster/server/service/DataGroupEngine.java | 40 ++---
.../server/service/DataGroupServiceImpls.java | 7 +-
.../cluster/server/service/DataSyncService.java | 4 +-
.../cluster/server/service/MetaAsyncService.java | 3 +-
.../cluster/server/service/MetaSyncService.java | 7 +-
.../threadpool/WrappedThreadPoolExecutor.java | 10 +-
23 files changed, 245 insertions(+), 266 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index b3af782f2b..562cff9c07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -386,15 +386,11 @@ public class ClusterDescriptor {
config.setUseVGRaft(
Boolean.parseBoolean(
- properties.getProperty(
- "use_vg_raft",
- String.valueOf(config.isUseVGRaft()))));
+ properties.getProperty("use_vg_raft", String.valueOf(config.isUseVGRaft()))));
config.setUseCRaft(
Boolean.parseBoolean(
- properties.getProperty(
- "use_c_raft",
- String.valueOf(config.isUseCRaft()))));
+ properties.getProperty("use_c_raft", String.valueOf(config.isUseCRaft()))));
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
index 67faaec11e..7b2af5ad6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
@@ -1,5 +1,8 @@
package org.apache.iotdb.cluster.expr.vgraft;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
@@ -13,16 +16,12 @@ import java.security.SignatureException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
public class KeyManager {
public static KeyManager INSTANCE = new KeyManager();
- private KeyManager() {
-
- }
+ private KeyManager() {}
private KeyPairGenerator keygen;
private SecureRandom sRandom;
@@ -42,8 +41,10 @@ public class KeyManager {
dsa.update(ClusterUtils.nodeToString(thisNode).getBytes(StandardCharsets.UTF_8));
nodeSignature = dsa.sign();
dummySignature = ClusterUtils.nodeToString(thisNode).getBytes(StandardCharsets.UTF_8);
- } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException |
- SignatureException e) {
+ } catch (NoSuchAlgorithmException
+ | NoSuchProviderException
+ | InvalidKeyException
+ | SignatureException e) {
throw new RuntimeException(e);
}
}
@@ -59,13 +60,15 @@ public class KeyManager {
dsa = Signature.getInstance("SHA1withDSA", "SUN");
dsa.initVerify(keyPair.getPublic());
dsa.verify(nodeSignature);
- } catch (NoSuchAlgorithmException | NoSuchProviderException | SignatureException |
- InvalidKeyException e) {
+ } catch (NoSuchAlgorithmException
+ | NoSuchProviderException
+ | SignatureException
+ | InvalidKeyException e) {
throw new RuntimeException(e);
}
- return Arrays.equals(ClusterUtils.nodeToString(node).getBytes(StandardCharsets.UTF_8),
- signature);
+ return Arrays.equals(
+ ClusterUtils.nodeToString(node).getBytes(StandardCharsets.UTF_8), signature);
}
public boolean verifyNodeSignature(Node node, ByteBuffer buffer) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
index d8669faa31..806290fa6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
@@ -1,23 +1,5 @@
package org.apache.iotdb.cluster.expr.vgraft;
-import java.nio.charset.StandardCharsets;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.SecureRandom;
-import java.security.Signature;
-import java.security.SignatureException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -25,12 +7,22 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
+
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
public class TrustValueHolder {
private static final Logger logger = LoggerFactory.getLogger(TrustValueHolder.class);
@@ -39,7 +31,6 @@ public class TrustValueHolder {
private final Map<Node, Long> trustValueMap = new ConcurrentHashMap<>();
private final RaftMember member;
-
public TrustValueHolder(RaftMember member) {
this.member = member;
}
@@ -86,17 +77,19 @@ public class TrustValueHolder {
TimeUnit.MILLISECONDS)) {
return verifiers;
} else {
- return nodeTrustValueList.subList(0, M).stream().map(Entry::getKey)
+ return nodeTrustValueList.subList(0, M).stream()
+ .map(Entry::getKey)
.collect(Collectors.toList());
}
} catch (InterruptedException e) {
- return nodeTrustValueList.subList(0, M).stream().map(Entry::getKey)
+ return nodeTrustValueList.subList(0, M).stream()
+ .map(Entry::getKey)
.collect(Collectors.toList());
}
}
- private AsyncMethodCallback<Void> getPingCallback(Entry<Node, Long> entry, int M,
- List<Node> verifiers, CountDownLatch countDownLatch) {
+ private AsyncMethodCallback<Void> getPingCallback(
+ Entry<Node, Long> entry, int M, List<Node> verifiers, CountDownLatch countDownLatch) {
return new AsyncMethodCallback<Void>() {
@Override
public void onComplete(Void unused) {
@@ -115,28 +108,37 @@ public class TrustValueHolder {
};
}
- private void pingVerifiersSync(List<Map.Entry<Node, Long>> nodeTrustValueList, int M,
- List<Node> verifiers, CountDownLatch countDownLatch) {
+ private void pingVerifiersSync(
+ List<Map.Entry<Node, Long>> nodeTrustValueList,
+ int M,
+ List<Node> verifiers,
+ CountDownLatch countDownLatch) {
for (Map.Entry<Node, Long> entry : nodeTrustValueList) {
AsyncMethodCallback<Void> callback = getPingCallback(entry, M, verifiers, countDownLatch);
- member.getSerialToParallelPool().submit(() -> {
- Client syncClient = member.getSyncClient(entry.getKey());
- try {
- syncClient.ping();
- callback.onComplete(null);
- } catch (TException e) {
- callback.onError(e);
- } finally {
- ClientUtils.putBackSyncClient(syncClient);
- }
- });
+ member
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client syncClient = member.getSyncClient(entry.getKey());
+ try {
+ syncClient.ping();
+ callback.onComplete(null);
+ } catch (TException e) {
+ callback.onError(e);
+ } finally {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ });
}
}
- private void pingVerifiersAsync(List<Map.Entry<Node, Long>> nodeTrustValueList, int M,
- List<Node> verifiers, CountDownLatch countDownLatch) {
+ private void pingVerifiersAsync(
+ List<Map.Entry<Node, Long>> nodeTrustValueList,
+ int M,
+ List<Node> verifiers,
+ CountDownLatch countDownLatch) {
for (Map.Entry<Node, Long> entry : nodeTrustValueList) {
AsyncMethodCallback<Void> callback = getPingCallback(entry, M, verifiers, countDownLatch);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index 4a08476869..3a77d15eb0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -19,15 +19,14 @@
package org.apache.iotdb.cluster.log;
-import java.util.Map.Entry;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 1665edbb58..0f99bb9fe7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.log;
-import java.util.concurrent.ArrayBlockingQueue;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,9 +28,9 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.WeightedList;
-
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +40,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -87,8 +87,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
for (int i = 0; i < bindingThreadNum; i++) {
for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
- executorServices.computeIfAbsent(pair.left, n -> IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + member.getName() + "-" + ClusterUtils.nodeToString(pair.left)))
+ executorServices
+ .computeIfAbsent(
+ pair.left,
+ n ->
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-"
+ + member.getName()
+ + "-"
+ + ClusterUtils.nodeToString(pair.left)))
.submit(newDispatcherThread(pair.left, pair.right));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 7032ef6e52..12e9a2927e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -41,8 +41,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.apache.iotdb.tsfile.utils.Pair;
+
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -103,8 +103,15 @@ public class LogDispatcher {
for (int i = 0; i < bindingThreadNum; i++) {
for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
- executorServices.computeIfAbsent(pair.left, n -> IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + member.getName() + "-" + ClusterUtils.nodeToString(pair.left)))
+ executorServices
+ .computeIfAbsent(
+ pair.left,
+ n ->
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ "LogDispatcher-"
+ + member.getName()
+ + "-"
+ + ClusterUtils.nodeToString(pair.left)))
.submit(newDispatcherThread(pair.left, pair.right));
}
}
@@ -124,7 +131,6 @@ public class LogDispatcher {
return newRequest;
}
-
private boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
long waitStart = System.currentTimeMillis();
@@ -146,6 +152,7 @@ public class LogDispatcher {
return nodeLogQueue.add(request);
}
}
+
public void offer(SendLogRequest request) {
long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
@@ -347,10 +354,13 @@ public class LogDispatcher {
request.getVotingLog().getLog().getCreateTime());
long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
- request.getVotingLog().getLog()
+ request
+ .getVotingLog()
+ .getLog()
.setByteSize(request.getAppendEntryRequest().entry.capacity());
if (clusterConfig.isUseVGRaft()) {
- request.getAppendEntryRequest()
+ request
+ .getAppendEntryRequest()
.setEntryHash(request.getAppendEntryRequest().entry.hashCode());
}
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
@@ -474,9 +484,6 @@ public class LogDispatcher {
logRequest.newLeaderTerm,
logRequest.quorumSize);
// TODO add async interface
- if (getSyncClient() == null) {
- setSyncClient(member.getSyncClient(receiver));
- }
int retries = 5;
try {
@@ -484,8 +491,12 @@ public class LogDispatcher {
for (int i = 0; i < retries; i++) {
int concurrentSender = concurrentSenderNum.incrementAndGet();
Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
- AppendEntryResult result = getSyncClient().appendEntry(logRequest.appendEntryRequest,
- logRequest.isVerifier);
+ Client client = getSyncClient();
+ if (client == null) {
+ continue;
+ }
+ AppendEntryResult result;
+ result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
concurrentSenderNum.decrementAndGet();
if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
Thread.sleep(100);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 29a72f1612..9b180d9073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -19,23 +19,23 @@
package org.apache.iotdb.cluster.log;
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
public class VotingLogList {
@@ -45,15 +45,35 @@ public class VotingLogList {
private int quorumSize;
private RaftMember member;
private Map<Integer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
+ private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
public VotingLogList(int quorumSize, RaftMember member) {
this.quorumSize = quorumSize;
this.member = member;
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ service,
+ () -> {
+ long newCommitIndex = computeNewCommitIndex();
+ if (newCommitIndex > member.getLogManager().getCommitLogIndex()) {
+ synchronized (member.getLogManager()) {
+ try {
+ member.getLogManager().commitTo(newCommitIndex);
+ } catch (LogExecutionException e) {
+ logger.error("Fail to commit {}", newCommitIndex, e);
+ }
+ }
+ }
+ },
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS);
}
-
private long computeNewCommitIndex() {
List<Entry<Integer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
+ if (nodeIndices.size() < quorumSize) {
+ return -1;
+ }
nodeIndices.sort(Entry.comparingByValue());
return nodeIndices.get(quorumSize - 1).getValue();
}
@@ -71,24 +91,15 @@ public class VotingLogList {
public void onStronglyAccept(long index, long term, Node acceptingNode, ByteBuffer signature) {
logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
- stronglyAcceptedIndices.compute(acceptingNode.nodeIdentifier, (nid, idx) -> {
- if (idx == null) {
- return index;
- } else {
- return Math.max(index, idx);
- }
- });
-
- long newCommitIndex = computeNewCommitIndex();
- if (newCommitIndex > member.getCommitIndex()) {
- synchronized (member.getLogManager()) {
- try {
- member.getLogManager().commitTo(newCommitIndex);
- } catch (LogExecutionException e) {
- logger.error("Fail to commit {}", newCommitIndex, e);
- }
- }
- }
+ stronglyAcceptedIndices.compute(
+ acceptingNode.nodeIdentifier,
+ (nid, idx) -> {
+ if (idx == null) {
+ return index;
+ } else {
+ return Math.max(index, idx);
+ }
+ });
}
public int totalAcceptedNodeNum(VotingLog log) {
@@ -96,7 +107,7 @@ public class VotingLogList {
int num = log.getWeaklyAcceptedNodeIds().size();
for (Entry<Integer, Long> entry : stronglyAcceptedIndices.entrySet()) {
if (entry.getValue() >= index) {
- num ++;
+ num++;
}
}
return num;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index f60523a514..ffae4ccb37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -77,8 +77,8 @@ public class BlockingLogAppender implements LogAppender {
// TODO: Consider memory footprint to execute a precise rejection
if ((logManager.getCommitLogIndex() - logManager.getMaxHaveAppliedCommitIndex())
<= ClusterDescriptor.getInstance()
- .getConfig()
- .getUnAppliedRaftLogNumForRejectThreshold()) {
+ .getConfig()
+ .getUnAppliedRaftLogNumForRejectThreshold()) {
synchronized (logManager) {
success =
logManager.maybeAppend(
@@ -97,7 +97,6 @@ public class BlockingLogAppender implements LogAppender {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
-
}
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
if (success != -1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 2f4d21dbf0..9303580d31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -136,8 +136,8 @@ public class SlidingWindowLogAppender implements LogAppender {
// TODO: Consider memory footprint to execute a precise rejection
if ((logManager.getCommitLogIndex() - logManager.getMaxHaveAppliedCommitIndex())
<= ClusterDescriptor.getInstance()
- .getConfig()
- .getUnAppliedRaftLogNumForRejectThreshold()) {
+ .getConfig()
+ .getUnAppliedRaftLogNumForRejectThreshold()) {
synchronized (logManager) {
success =
logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
@@ -155,7 +155,6 @@ public class SlidingWindowLogAppender implements LogAppender {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
-
}
if (success != -1) {
moveWindowRightward(flushPos);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index f17ce02b13..ff04103529 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index d1b682257d..eaee82bf39 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -66,10 +65,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
LoggerFactory.getLogger(FilePartitionedSnapshotLogManager.class);
public FilePartitionedSnapshotLogManager(
- PartitionTable partitionTable,
- Node header,
- Node thisNode,
- DataGroupMember dataGroupMember) {
+ PartitionTable partitionTable, Node header, Node thisNode, DataGroupMember dataGroupMember) {
super(
createLogApplier(dataGroupMember),
partitionTable,
@@ -80,8 +76,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
dataGroupMember.getStateMachine());
}
- private static LogApplier createLogApplier(
- DataGroupMember dataGroupMember) {
+ private static LogApplier createLogApplier(DataGroupMember dataGroupMember) {
LogApplier applier = new DataLogApplier(dataGroupMember);
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
&& ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 25a7e02ac7..0cb405f76a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.service.IoTDB;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
index e946ea06c1..0f41aae793 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server.handlers.caller;
-import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,6 +28,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
@@ -54,8 +54,7 @@ public class LogCatchUpHandler implements AsyncMethodCallback<AppendEntryResult>
private void processStrongAccept(ByteBuffer signature) {
raftMember
.getVotingLogList()
- .onStronglyAccept(log.getCurrLogIndex(), log.getCurrLogTerm(), follower,
- signature);
+ .onStronglyAccept(log.getCurrLogIndex(), log.getCurrLogTerm(), follower, signature);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 027b706af0..5c29f6e3f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -141,8 +141,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
/**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so
- * on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
*/
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember implements RaftMemberMBean {
@@ -177,34 +176,24 @@ public abstract class RaftMember implements RaftMemberMBean {
* on this may be woken.
*/
private final Object waitLeaderCondition = new Object();
- /**
- * the lock is to make sure that only one thread can apply snapshot at the same time
- */
+ /** the lock is to make sure that only one thread can apply snapshot at the same time */
private final Lock snapshotApplyLock = new ReentrantLock();
private final Object heartBeatWaitObject = new Object();
protected Node thisNode = ClusterIoTDB.getInstance().getThisNode();
- /**
- * the nodes that belong to the same raft group as thisNode.
- */
+ /** the nodes that belong to the same raft group as thisNode. */
protected PartitionGroup allNodes;
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- /**
- * the name of the member, to distinguish several members in the logs.
- */
+ /** the name of the member, to distinguish several members in the logs. */
ConsensusGroupId groupId;
String name;
- /**
- * to choose nodes to send request of joining cluster randomly.
- */
+ /** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
- /**
- * when the node is a leader, this map is used to track log progress of each follower.
- */
+ /** when the node is a leader, this map is used to track log progress of each follower. */
Map<Node, PeerInfo> peerMap;
/**
* the current term of the node, this object also works as lock of some transactions of the member
@@ -226,9 +215,7 @@ public abstract class RaftMember implements RaftMemberMBean {
*/
volatile long lastHeartbeatReceivedTime;
- /**
- * the raft logs are all stored and maintained in the log manager
- */
+ /** the raft logs are all stored and maintained in the log manager */
protected RaftLogManager logManager;
/**
@@ -247,9 +234,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* member by comparing it with the current last log index.
*/
long lastReportedLogIndex;
- /**
- * the thread pool that runs catch-up tasks
- */
+ /** the thread pool that runs catch-up tasks */
private ExecutorService catchUpService;
/**
* lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -280,15 +265,12 @@ public abstract class RaftMember implements RaftMemberMBean {
* one slow node.
*/
private ExecutorService serialToParallelPool;
- /**
- * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
- */
+ /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
- * logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
*/
private volatile LogDispatcher logDispatcher;
@@ -300,9 +282,7 @@ public abstract class RaftMember implements RaftMemberMBean {
protected IStateMachine stateMachine;
- /**
- * (logIndex, logTerm) -> append handler
- */
+ /** (logIndex, logTerm) -> append handler */
protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
new ConcurrentHashMap<>();
@@ -321,7 +301,6 @@ public abstract class RaftMember implements RaftMemberMBean {
// VG-Raft related
private TrustValueHolder trustValueHolder = null;
-
protected RaftMember(IStateMachine stateMachine) {
this.stateMachine = stateMachine;
}
@@ -640,7 +619,8 @@ public abstract class RaftMember implements RaftMemberMBean {
* Process an AppendEntryRequest. First check the term of the leader, then parse the log and
* finally see if we can find a position to append the log.
*/
- public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws UnknownLogTypeException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+ throws UnknownLogTypeException {
long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
logger.debug("{} received an AppendEntryRequest: {}", name, request);
if (logger.isDebugEnabled()) {
@@ -665,11 +645,13 @@ public abstract class RaftMember implements RaftMemberMBean {
if (request.entryHash != request.entry.hashCode()) {
return new AppendEntryResult(Response.RESPONSE_HASH_INCONSISTENT).setHeader(getHeader());
}
- if (!KeyManager.INSTANCE.verifyNodeSignature(request.leader,
- request.leaderSignature.array(), request.leaderSignature.arrayOffset(),
+ if (!KeyManager.INSTANCE.verifyNodeSignature(
+ request.leader,
+ request.leaderSignature.array(),
+ request.leaderSignature.arrayOffset(),
request.leaderSignature.remaining())) {
- return new AppendEntryResult(Response.RESPONSE_SIGNATURE_INCONSISTENT).setHeader(
- getHeader());
+ return new AppendEntryResult(Response.RESPONSE_SIGNATURE_INCONSISTENT)
+ .setHeader(getHeader());
}
}
@@ -693,17 +675,13 @@ public abstract class RaftMember implements RaftMemberMBean {
return result;
}
- /**
- * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
- */
+ /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public AppendEntryResult appendEntries(AppendEntriesRequest request)
throws UnknownLogTypeException {
return appendEntriesInternal(request);
}
- /**
- * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
- */
+ /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -865,22 +843,16 @@ public abstract class RaftMember implements RaftMemberMBean {
return lastCatchUpResponseTime;
}
- /**
- * Sub-classes will add their own process of HeartBeatResponse in this method.
- */
- public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
- }
+ /** Sub-classes will add their own process of HeartBeatResponse in this method. */
+ public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
- /**
- * The actions performed when the node wins in an election (becoming a leader).
- */
- public void onElectionWins() {
- }
+ /** The actions performed when the node wins in an election (becoming a leader). */
+ public void onElectionWins() {}
/**
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
- * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
- * a part of data is in the snapshot, then it is not in the logs.
+ * follower. If some of the required logs are removed, also send the snapshot. <br>
+ * notice that if a part of data is in the snapshot, then it is not in the logs.
*/
public void catchUp(Node follower, long lastLogIdx) {
// for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -963,9 +935,7 @@ public abstract class RaftMember implements RaftMemberMBean {
"%s:%s=%s", "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "Engine");
}
- /**
- * call back after syncLeader
- */
+ /** call back after syncLeader */
public interface CheckConsistency {
/**
@@ -974,7 +944,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
- * implements.
+ * implements.
*/
void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException;
@@ -996,7 +966,7 @@ public abstract class RaftMember implements RaftMemberMBean {
if (leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE
|| leaderCommitId - localAppliedId
- > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+ > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
}
}
@@ -1029,7 +999,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
@@ -1048,9 +1018,7 @@ public abstract class RaftMember implements RaftMemberMBean {
return waitUntilCatchUp(checkConsistency);
}
- /**
- * Wait until the leader of this node becomes known or time out.
- */
+ /** Wait until the leader of this node becomes known or time out. */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -1077,7 +1045,7 @@ public abstract class RaftMember implements RaftMemberMBean {
*
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -1110,7 +1078,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enable, when log differ too much, return false directly.
+ * @param fastFail if enable, when log differ too much, return false directly.
* @return true if leaderCommitId <= localAppliedId
*/
public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1163,7 +1131,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* call this method. Will commit the log locally and send it to followers
*
* @return OK if over half of the followers accept the log or null if the leadership is lost
- * during the appending
+ * during the appending
*/
public TSStatus processPlanLocally(IConsensusRequest request) {
if (USE_LOG_DISPATCHER) {
@@ -1194,7 +1162,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// we need to return error code to the client as in server mode
if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
&& log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
@@ -1268,7 +1236,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// just like processPlanLocally,we need to check the size of log
if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
&& log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
@@ -1431,9 +1399,7 @@ public abstract class RaftMember implements RaftMemberMBean {
return peerMap.computeIfAbsent(node, r -> new PeerInfo(getLogManager().getLastLogIndex()));
}
- /**
- * @return true if there is a log whose index is "index" and term is "term", false otherwise
- */
+ /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1452,18 +1418,15 @@ public abstract class RaftMember implements RaftMemberMBean {
return syncLock;
}
- /**
- * Sub-classes will add their own process of HeartBeatRequest in this method.
- */
- void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
- }
+ /** Sub-classes will add their own process of HeartBeatRequest in this method. */
+ void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
/**
* Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
* request is valid.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkElectorLogProgress(ElectionRequest electionRequest) {
@@ -1507,7 +1470,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* lastLogIndex is smaller than the voter's Otherwise accept the election.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkLogProgress(long lastLogIndex, long lastLogTerm) {
long response;
@@ -1524,10 +1487,10 @@ public abstract class RaftMember implements RaftMemberMBean {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param header must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardPlan(IConsensusRequest plan, Node node, RaftNode header) {
@@ -1558,7 +1521,7 @@ public abstract class RaftMember implements RaftMemberMBean {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param header to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1637,7 +1600,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* Get an asynchronous thrift client of the given node.
*
* @return an asynchronous thrift client or null if the caller tries to connect the local node or
- * the node cannot be reached.
+ * the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
try {
@@ -1668,7 +1631,7 @@ public abstract class RaftMember implements RaftMemberMBean {
try {
return clientManager.borrowSyncClient(node, getClientCategory());
} catch (IOException e) {
- logger.error("borrow sync client fail", e);
+ logger.debug("borrow sync client fail", e);
return null;
}
}
@@ -1773,13 +1736,14 @@ public abstract class RaftMember implements RaftMemberMBean {
synchronized (log.getLog()) {
while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && getCommitIndex() < log.getLog().getCurrLogIndex()
- || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && log.getSignatures().size() < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
- && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
- || (totalAccepted < quorumSize))
- && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
- && !log.isHasFailed()) {
+ && getCommitIndex() < log.getLog().getCurrLogIndex()
+ || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
+ && log.getSignatures().size()
+ < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
+ && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
+ || (totalAccepted < quorumSize))
+ && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
+ && !log.isHasFailed()) {
try {
log.getLog().wait(waitTime);
} catch (InterruptedException e) {
@@ -1831,12 +1795,12 @@ public abstract class RaftMember implements RaftMemberMBean {
if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && log.getLog().getCurrLogIndex() > getCommitIndex()
- || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && log.getSignatures().size() < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
- && (!ENABLE_WEAK_ACCEPTANCE
- || (totalAccepted < quorumSize))
- && !log.isHasFailed())) {
+ && log.getLog().getCurrLogIndex() > getCommitIndex()
+ || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
+ && log.getSignatures().size()
+ < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
+ && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
+ && !log.isHasFailed())) {
waitAppendResultLoop(log, quorumSize);
}
@@ -1958,7 +1922,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* heartbeat timer.
*
* @param fromLeader true if the request is from a leader, false if the request is from an
- * elector.
+ * elector.
*/
public void stepDown(long newTerm, boolean fromLeader) {
synchronized (logManager) {
@@ -1994,9 +1958,7 @@ public abstract class RaftMember implements RaftMemberMBean {
this.thisNode = thisNode;
}
- /**
- * @return the header of the data raft group or null if this is in a meta group.
- */
+ /** @return the header of the data raft group or null if this is in a meta group. */
public RaftNode getHeader() {
return null;
}
@@ -2156,9 +2118,7 @@ public abstract class RaftMember implements RaftMemberMBean {
return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
}
- /**
- * Send "log" to "node".
- */
+ /** Send "log" to "node". */
public void sendLogToFollower(
VotingLog log,
Node node,
@@ -2228,7 +2188,8 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- int quorumSize, boolean isVerifier) {
+ int quorumSize,
+ boolean isVerifier) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 0eafbd76cd..37261fe9bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -76,7 +76,8 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
@Override
public void appendEntry(
AppendEntryRequest request,
- boolean isVerifier, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ boolean isVerifier,
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
try {
resultHandler.onComplete(member.appendEntry(request, isVerifier));
} catch (UnknownLogTypeException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 1b2c9be4eb..63297012a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -70,7 +70,8 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+ throws TException {
try {
return member.appendEntry(request, isVerifier);
} catch (UnknownLogTypeException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index a6c0f47224..5dbefe805d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -502,7 +502,5 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
}
@Override
- public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {
-
- }
+ public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 6b7efa2365..ee56ffbf63 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -100,8 +100,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
}
@Override
- public void start() throws StartupException {
- }
+ public void start() throws StartupException {}
@Override
public void stop() {
@@ -141,8 +140,8 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
public void waitPartitionTableReady() throws PartitionTableUnavailableException {
long waitStart = System.currentTimeMillis();
while (!metaGroupMember.isReady()
- && (System.currentTimeMillis() - waitStart) < ClusterDescriptor.getInstance().getConfig()
- .getConnectionTimeoutInMS()) {
+ && (System.currentTimeMillis() - waitStart)
+ < ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -194,10 +193,10 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
}
/**
- * @param header the header of the group which the local node is in
+ * @param header the header of the group which the local node is in
* @param resultHandler can be set to null if the request is an internal request
- * @param request the toString() of this parameter should explain what the request is and it
- * is only used in logs for tracing
+ * @param request the toString() of this parameter should explain what the request is and it is
+ * only used in logs for tracing
* @return
*/
public <T> DataGroupMember getDataMember(
@@ -351,9 +350,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
}
}
- /**
- * Make sure the group will not receive new raft logs.
- */
+ /** Make sure the group will not receive new raft logs. */
private void removeMember(
RaftNode header, DataGroupMember dataGroupMember, boolean removedGroup) {
dataGroupMember.setReadOnly();
@@ -362,14 +359,14 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
} else {
if (dataGroupMember.getCharacter() != NodeCharacter.LEADER) {
new Thread(
- () -> {
- try {
- dataGroupMember.syncLeader(null);
- dataGroupMember.stop();
- } catch (CheckConsistencyException e) {
- logger.warn("Failed to check consistency.", e);
- }
- })
+ () -> {
+ try {
+ dataGroupMember.syncLeader(null);
+ dataGroupMember.stop();
+ } catch (CheckConsistencyException e) {
+ logger.warn("Failed to check consistency.", e);
+ }
+ })
.start();
}
}
@@ -488,9 +485,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
this.partitionTable = partitionTable;
}
- /**
- * @return The reports of every DataGroupMember in this node.
- */
+ /** @return The reports of every DataGroupMember in this node. */
public List<DataMemberReport> genMemberReports() {
List<DataMemberReport> dataMemberReports = new ArrayList<>();
for (DataGroupMember value : headerGroupMap.values()) {
@@ -514,8 +509,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
private static class InstanceHolder {
- private InstanceHolder() {
- }
+ private InstanceHolder() {}
private static final DataGroupEngine Instance = new DataGroupEngine();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index d449542ded..d39d527d95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -96,7 +96,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void appendEntry(
- AppendEntryRequest request, boolean isVerifier,
+ AppendEntryRequest request,
+ boolean isVerifier,
AsyncMethodCallback<AppendEntryResult> resultHandler)
throws TException {
DataAsyncService service =
@@ -789,9 +790,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void ping() throws TException {
-
- }
+ public void ping() throws TException {}
@Override
public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 47801ff829..4995b07a6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -457,7 +457,5 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public void ping() throws TException {
-
- }
+ public void ping() throws TException {}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 736a083abc..f406078d35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -62,7 +62,8 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
@Override
public void appendEntry(
AppendEntryRequest request,
- boolean isVerifier, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ boolean isVerifier,
+ AsyncMethodCallback<AppendEntryResult> resultHandler) {
// if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
// locally, but the partition table is not verified), we do not handle the RPC requests.
if (!metaGroupMember.isReady() && metaGroupMember.getPartitionTable() == null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index ed5f27974f..e782adab3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -62,7 +62,8 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
// behavior of followers
@Override
- public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws TException {
+ public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+ throws TException {
// if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
// locally, but the partition table is not verified), we do not handle the RPC requests.
if (!metaGroupMember.isReady()) {
@@ -259,7 +260,5 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
}
@Override
- public void ping() throws TException {
-
- }
+ public void ping() throws TException {}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index 834f5d5500..4d126607f9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.service.JMXService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -32,6 +35,7 @@ import java.util.concurrent.TimeUnit;
public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
implements WrappedThreadPoolExecutorMBean {
private final String mbeanName;
+ private static final Logger logger = LoggerFactory.getLogger(WrappedThreadPoolExecutor.class);
public WrappedThreadPoolExecutor(
int corePoolSize,
@@ -60,7 +64,11 @@ public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
this.mbeanName =
String.format(
"%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, IoTDBConstant.JMX_TYPE, mbeanName);
- JMXService.registerMBean(this, this.mbeanName);
+ try {
+ JMXService.registerMBean(this, this.mbeanName);
+ } catch (Exception e) {
+ logger.warn("Cannot register pool {}", mbeanName, e);
+ }
}
@Override