You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/01/26 12:15:22 UTC
[ignite-3] branch main updated: IGNITE-15705 Implemented election timeout auto-adjusting mechanism. Fixes #481
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f51281b IGNITE-15705 Implemented election timeout auto-adjusting mechanism. Fixes #481
f51281b is described below
commit f51281b2d5b697be98b03de8582839529fd7e192
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed Jan 26 15:14:59 2022 +0300
IGNITE-15705 Implemented election timeout auto-adjusting mechanism. Fixes #481
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 141 +++++++++++++++------
.../java/org/apache/ignite/internal/raft/Loza.java | 16 +--
.../internal/raft/server/impl/JraftServerImpl.java | 17 ++-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 57 +++++++++
.../ignite/raft/jraft/option/NodeOptions.java | 13 ++
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 26 ++--
.../util/ExponentialBackoffTimeoutStrategy.java | 79 ++++++++++++
.../raft/jraft/util/NoopTimeoutStrategy.java | 25 ++++
.../ignite/raft/jraft/util/TimeoutStrategy.java | 33 +++++
.../apache/ignite/raft/jraft/core/TestCluster.java | 3 +
.../ExponentialBackoffTimeoutStrategyTest.java | 54 ++++++++
11 files changed, 404 insertions(+), 60 deletions(-)
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 18b08eb..e5e97b0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.raft.jraft.core;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
+import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -51,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -99,6 +101,7 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -1488,20 +1491,15 @@ public class ItNodeTest {
List<Node> followers = cluster.getFollowers();
- for (Node follower : followers) {
- NodeImpl follower0 = (NodeImpl) follower;
- DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
- RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages((msg, nodeId) -> {
- if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
+ blockMessagesOnFollowers(followers, (msg, nodeId) -> {
+ if (msg instanceof RpcRequests.RequestVoteRequest) {
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
- return !msg0.preVote();
- }
+ return !msg0.preVote();
+ }
- return false;
- });
- }
+ return false;
+ });
// stop leader
LOG.warn("Stop leader {}", leader.getNodeId().getPeerId());
@@ -1511,12 +1509,7 @@ public class ItNodeTest {
assertFalse(followers.isEmpty());
sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.
- for (Node follower : followers) {
- NodeImpl follower0 = (NodeImpl) follower;
- DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
- RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.stopBlock();
- }
+ stopBlockingMessagesOnFollowers(followers);
// elect new leader
cluster.waitLeader();
@@ -2747,7 +2740,7 @@ public class ItNodeTest {
// Block only one vote message.
for (NodeImpl node : nodes) {
- RpcClientEx rpcClientEx = TestUtils.sender(node);
+ RpcClientEx rpcClientEx = sender(node);
rpcClientEx.recordMessages((msg, nodeId) -> true);
rpcClientEx.blockMessages((msg, nodeId) -> {
if (msg instanceof RpcRequests.RequestVoteRequest) {
@@ -2776,7 +2769,7 @@ public class ItNodeTest {
Node leader = cluster.getLeader();
cluster.ensureLeader(leader);
- RpcClientEx client = TestUtils.sender(leader);
+ RpcClientEx client = sender(leader);
client.stopBlock(1); // Unblock vote message.
@@ -3367,20 +3360,15 @@ public class ItNodeTest {
List<Node> followers = cluster.getFollowers();
- for (Node follower : followers) {
- NodeImpl follower0 = (NodeImpl) follower;
- DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
- RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.blockMessages((msg, nodeId) -> {
- if (msg instanceof RpcRequests.RequestVoteRequest) {
- RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg;
+ blockMessagesOnFollowers(followers, (msg, nodeId) -> {
+ if (msg instanceof RpcRequests.RequestVoteRequest) {
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
- return !msg0.preVote();
- }
+ return !msg0.preVote();
+ }
- return false;
- });
- }
+ return false;
+ });
LOG.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());
@@ -3392,17 +3380,84 @@ public class ItNodeTest {
assertNull(cluster.getLeader());
+ stopBlockingMessagesOnFollowers(followers);
+
+ // elect new leader
+ cluster.waitLeader();
+ leader = cluster.getLeader();
+ LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
+ }
+
+ @Test
+ public void testElectionTimeoutAutoAdjustWhenBlockedAllMessages() throws Exception {
+ testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> true);
+ }
+
+ @Test
+ public void testElectionTimeoutAutoAdjustWhenBlockedRequestVoteMessages() throws Exception {
+ testElectionTimeoutAutoAdjustWhenBlockedMessages((msg, nodeId) -> {
+ if (msg instanceof RpcRequests.RequestVoteRequest) {
+ RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest) msg;
+
+ return !msg0.preVote();
+ }
+
+ return false;
+ });
+ }
+
+ private void testElectionTimeoutAutoAdjustWhenBlockedMessages(BiPredicate<Object, String> blockingPredicate) throws Exception {
+ List<PeerId> peers = TestUtils.generatePeers(4);
+ int maxElectionRoundsWithoutAdjusting = 3;
+
+ cluster = new TestCluster("unittest", dataPath, peers, new LinkedHashSet<>(), ELECTION_TIMEOUT_MILLIS,
+ opts -> opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11_000, maxElectionRoundsWithoutAdjusting)),
+ testInfo);
+
+ for (PeerId peer : peers) {
+ assertTrue(cluster.start(peer.getEndpoint()));
+ }
+
+ cluster.waitLeader();
+
+ Node leader = cluster.getLeader();
+
+ int initElectionTimeout = leader.getOptions().getElectionTimeoutMs();
+
+ LOG.warn("Current leader {}, electTimeout={}", leader.getNodeId().getPeerId(), leader.getOptions().getElectionTimeoutMs());
+
+ List<Node> followers = cluster.getFollowers();
+
for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
- DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService();
- RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient();
- rpcClientEx.stopBlock();
+
+ assertEquals(initElectionTimeout, follower0.getOptions().getElectionTimeoutMs());
}
+ blockMessagesOnFollowers(followers, blockingPredicate);
+
+ LOG.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm());
+
+ assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
+
+ assertNull(cluster.getLeader());
+
+ assertTrue(waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() > initElectionTimeout),
+ (long) maxElectionRoundsWithoutAdjusting
+ // need to multiply to 2 because stepDown happens after voteTimer timeout
+ * (initElectionTimeout + followers.get(0).getOptions().getRaftOptions().getMaxElectionDelayMs()) * 2));
+
+ stopBlockingMessagesOnFollowers(followers);
+
// elect new leader
cluster.waitLeader();
leader = cluster.getLeader();
- LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
+
+ LOG.info("Elected new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm());
+
+ assertTrue(
+ waitForCondition(() -> followers.stream().allMatch(f -> f.getOptions().getElectionTimeoutMs() == initElectionTimeout),
+ 3_000));
}
/**
@@ -3686,4 +3741,18 @@ public class ItNodeTest {
latch.await();
return success.get();
}
+
+ private void blockMessagesOnFollowers(List<Node> followers, BiPredicate<Object, String> blockingPredicate) {
+ for (Node follower : followers) {
+ RpcClientEx rpcClientEx = sender(follower);
+ rpcClientEx.blockMessages(blockingPredicate);
+ }
+ }
+
+ private void stopBlockingMessagesOnFollowers(List<Node> followers) {
+ for (Node follower : followers) {
+ RpcClientEx rpcClientEx = sender(follower);
+ rpcClientEx.stopBlock();
+ }
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 48bb9e4..66d51ed 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -62,12 +62,10 @@ public class Loza implements IgniteComponent {
private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
/** Timeout. */
- // TODO: IGNITE-15705 Correct value should be investigated
- private static final int TIMEOUT = 10000;
+ private static final int RETRY_TIMEOUT = 10000;
/** Network timeout. */
- // TODO: IGNITE-15705 Correct value should be investigated
- private static final int NETWORK_TIMEOUT = 3000;
+ private static final int RPC_TIMEOUT = 3000;
/** Retry delay. */
private static final int DELAY = 200;
@@ -205,8 +203,8 @@ public class Loza implements IgniteComponent {
groupId,
clusterNetSvc,
FACTORY,
- TIMEOUT,
- NETWORK_TIMEOUT,
+ RETRY_TIMEOUT,
+ RPC_TIMEOUT,
peers,
true,
DELAY,
@@ -274,7 +272,7 @@ public class Loza implements IgniteComponent {
groupId,
clusterNetSvc,
FACTORY,
- TIMEOUT,
+ RETRY_TIMEOUT,
peers,
true,
DELAY,
@@ -323,8 +321,8 @@ public class Loza implements IgniteComponent {
groupId,
clusterNetSvc,
FACTORY,
- 10 * TIMEOUT,
- 10 * NETWORK_TIMEOUT,
+ 10 * RETRY_TIMEOUT,
+ 10 * RPC_TIMEOUT,
expectedPeers,
true,
DELAY,
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index d3a0131..b3dba98 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
import org.jetbrains.annotations.Nullable;
@@ -121,8 +122,20 @@ public class JraftServerImpl implements RaftServer {
this.opts.setSharedPools(true);
if (opts.getServerName() == null) {
- opts.setServerName(service.localConfiguration().getName());
- }
+ this.opts.setServerName(service.localConfiguration().getName());
+ }
+
+ /*
+ Timeout increasing strategy for election timeout. Adjusting happens according to
+ {@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy} when a leader is not elected, after several
+ consecutive unsuccessful leader elections, which could be controlled through {@code roundsWithoutAdjusting} parameter of
+ {@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy}.
+ Max timeout value that {@link org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy} could produce
+ must be more than timeout of a membership protocol to remove failed node from the cluster.
+ In our case, we may assume that 11s could be enough as far as 11s is greater
+ than suspicion timeout for the 1000 nodes cluster with ping interval equals 500ms.
+ */
+ this.opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11_000, 3));
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index e42a33d..6c62a7f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -116,6 +116,7 @@ import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.ThreadId;
+import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
@@ -147,6 +148,10 @@ public class NodeImpl implements Node, RaftServerService {
private final Ballot prevVoteCtx = new Ballot();
private ConfigurationEntry conf;
private StopTransferArg stopTransferArg;
+ private boolean electionAdjusted;
+ private long electionRound;
+ private int initialElectionTimeout;
+
/**
* Raft group and node options and identifier
*/
@@ -598,6 +603,7 @@ public class NodeImpl implements Node, RaftServerService {
}
doUnlock = false;
+ adjustElectionTimeout();
preVote();
}
@@ -609,6 +615,50 @@ public class NodeImpl implements Node, RaftServerService {
}
/**
+ * Method that adjusts election timeout after several consecutive unsuccessful leader elections according to {@link TimeoutStrategy}
+ * <p>
+ * Notes about general algorithm: The main idea is that in a stable cluster election timeout should be relatively small, but when
+ * something prevents elections from completion, like an unstable network or long GC pauses, we don't want to have a lot of
+ * elections, so election timeout is adjusted. Hence, the upper bound of the election timeout adjusting is the value, which is enough to
+ * elect a leader or handle problems that prevent a successful leader election.
+ * <p>
+ * Leader election timeout is set to an initial value after a successful election of a leader.
+ */
+ private void adjustElectionTimeout() {
+ electionRound++;
+
+ if (electionRound > 1)
+ LOG.info("Unsuccessful election round number {}", electionRound);
+
+ if (!electionAdjusted) {
+ initialElectionTimeout = options.getElectionTimeoutMs();
+ }
+
+ long timeout = options.getElectionTimeoutStrategy().nextTimeout(options.getElectionTimeoutMs(), electionRound);
+
+ if (timeout != options.getElectionTimeoutMs()) {
+ resetElectionTimeoutMs((int) timeout);
+ LOG.info("Election timeout was adjusted according to {} ", options.getElectionTimeoutStrategy());
+ electionAdjusted = true;
+ }
+ }
+
+ /**
+ * Method that resets election timeout to initial value after an adjusting.
+ *
+ * For more details see {@link NodeImpl#adjustElectionTimeout()}.
+ */
+ private void resetElectionTimeoutToInitial() {
+ electionRound = 0;
+
+ if (electionAdjusted) {
+ LOG.info("Election timeout was reset to initial value due to successful leader election.");
+ resetElectionTimeoutMs(initialElectionTimeout);
+ electionAdjusted = false;
+ }
+ }
+
+ /**
* Whether to allow for launching election or not by comparing node's priority with target priority. And at the same
* time, if next leader is not elected until next election timeout, it decays its local target priority
* exponentially.
@@ -1241,6 +1291,8 @@ public class NodeImpl implements Node, RaftServerService {
this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
}
this.leaderId = newLeaderId.copy();
+
+ resetElectionTimeoutToInitial();
}
}
@@ -1301,6 +1353,7 @@ public class NodeImpl implements Node, RaftServerService {
throw new IllegalStateException();
}
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
+ resetElectionTimeoutToInitial();
this.stepDownTimer.start();
}
@@ -2800,6 +2853,10 @@ public class NodeImpl implements Node, RaftServerService {
return;
}
+ // This is needed for the node, which won preVote in a previous iteration, but leader wasn't elected.
+ if (this.prevVoteCtx.isGranted())
+ adjustElectionTimeout();
+
if (this.raftOptions.isStepDownWhenVoteTimedout()) {
LOG.warn(
"Candidate node {} term {} steps down when election reaching vote timeout: fail to get quorum vote-granted.",
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 5e61cc9..fcdcc17 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -18,6 +18,8 @@ package org.apache.ignite.raft.jraft.option;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
+import org.apache.ignite.raft.jraft.util.NoopTimeoutStrategy;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -49,6 +51,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// Default: 1200 (1.2s)
private int electionTimeoutMs = 1200; // follower to candidate timeout
+ private TimeoutStrategy electionTimeoutStrategy = new NoopTimeoutStrategy();
+
// One node's local priority value would be set to | electionPriority |
// value when it starts up.If this value is set to 0,the node will never be a leader.
// If this node doesn't support priority election,then set this value to -1.
@@ -603,6 +607,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
nodeOptions.setSharedPools(this.isSharedPools());
nodeOptions.setRpcDefaultTimeout(this.getRpcDefaultTimeout());
nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
+ nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
return nodeOptions;
}
@@ -634,4 +639,12 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
public List<Replicator.ReplicatorStateListener> getReplicationStateListeners() {
return replicationStateListeners;
}
+
+ public TimeoutStrategy getElectionTimeoutStrategy() {
+ return electionTimeoutStrategy;
+ }
+
+ public void setElectionTimeoutStrategy(TimeoutStrategy electionTimeoutStrategy) {
+ this.electionTimeoutStrategy = electionTimeoutStrategy;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 9955158..c917142 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -77,7 +77,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
private volatile long timeout;
/** Timeout for network calls. */
- private final long networkTimeout;
+ private final long rpcTimeout;
/** */
private final String groupId;
@@ -120,7 +120,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
- int networkTimeout,
+ int rpcTimeout,
List<Peer> peers,
Peer leader,
long retryDelay,
@@ -131,7 +131,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
this.learners = Collections.emptyList();
this.factory = factory;
this.timeout = timeout;
- this.networkTimeout = networkTimeout;
+ this.rpcTimeout = rpcTimeout;
this.groupId = groupId;
this.retryDelay = retryDelay;
this.leader = leader;
@@ -145,7 +145,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param cluster Cluster service.
* @param factory Message factory.
* @param timeout Timeout.
- * @param netTimeout Network call timeout.
+ * @param rpcTimeout Network call timeout.
* @param peers List of all peers.
* @param getLeader {@code True} to get the group's leader upon service creation.
* @param retryDelay Retry delay.
@@ -157,13 +157,13 @@ public class RaftGroupServiceImpl implements RaftGroupService {
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
- int netTimeout,
+ int rpcTimeout,
List<Peer> peers,
boolean getLeader,
long retryDelay,
ScheduledExecutorService executor
) {
- var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, netTimeout, peers, null, retryDelay, executor);
+ var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, rpcTimeout, peers, null, retryDelay, executor);
if (!getLeader)
return CompletableFuture.completedFuture(service);
@@ -424,7 +424,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
TransferLeaderRequest req = factory.transferLeaderRequest()
.groupId(groupId).leaderId(PeerId.fromPeer(newLeader).toString()).build();
- CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(newLeader.address(), req, timeout);
+ CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(newLeader.address(), req, rpcTimeout);
return fut.thenCompose(resp -> {
if (resp != null) {
@@ -466,7 +466,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
- return cluster.messagingService().invoke(peer.address(), req, timeout)
+ return cluster.messagingService().invoke(peer.address(), req, rpcTimeout)
.thenApply(resp -> (R) ((ActionResponse) resp).result());
}
@@ -497,14 +497,14 @@ public class RaftGroupServiceImpl implements RaftGroupService {
cluster.topologyService().localMember().address(),
peer.address());
}
-
+
if (currentTimeMillis() >= stopTime) {
fut.completeExceptionally(new TimeoutException());
return;
}
- CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, networkTimeout);
+ CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, rpcTimeout);
//TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside
fut0.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
@@ -516,7 +516,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
peer.address(),
err == null ? null : err.getMessage());
}
-
+
if (err != null) {
if (recoverable(err)) {
executor.schedule(() -> {
@@ -534,7 +534,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
if (resp0.errorCode() == RaftError.SUCCESS.getNumber()) { // Handle OK response.
leader = peer; // The OK response was received from a leader.
-
+
fut.complete(null); // Void response.
}
else if (resp0.errorCode() == RaftError.EBUSY.getNumber() ||
@@ -559,7 +559,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
else {
leader = parsePeer(resp0.leaderId()); // Update a leader.
-
+
executor.schedule(() -> {
sendWithRetry(leader, req, stopTime, fut);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategy.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategy.java
new file mode 100644
index 0000000..c90ca5e
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Timeout generation strategy.
+ * Increases provided timeout based on exponential backoff algorithm. Max timeout equals to {@link DEFAULT_TIMEOUT_MS_MAX}
+ */
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
+ /** Default backoff coefficient to calculate next timeout based on backoff strategy. */
+ private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
+
+ /** Default max timeout that strategy could generate, ms. */
+ private static final int DEFAULT_TIMEOUT_MS_MAX = 11_000;
+
+ /** Default max number of a round after which timeout will be adjusted. */
+ private static final long DEFAULT_ROUNDS_WITHOUT_ADJUSTING = 3;
+
+ /** Max timeout that strategy could generate, ms. */
+ private final int maxTimeout;
+
+ /** Max number of a round after which timeout will be adjusted. */
+ private final long roundsWithoutAdjusting;
+
+ public ExponentialBackoffTimeoutStrategy() {
+ this(DEFAULT_TIMEOUT_MS_MAX, DEFAULT_ROUNDS_WITHOUT_ADJUSTING);
+ }
+
+ /*
+ * @param maxTimeout Max timeout that strategy could generate.
+ * @param roundsWithoutAdjusting Max number of a round after which timeout will be adjusted.
+ */
+ public ExponentialBackoffTimeoutStrategy(int maxTimeout, long roundsWithoutAdjusting) {
+ this.maxTimeout = maxTimeout;
+
+ this.roundsWithoutAdjusting = roundsWithoutAdjusting;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int nextTimeout(int currentTimeout, long round) {
+ if (round <= roundsWithoutAdjusting)
+ return currentTimeout;
+
+ return backoffTimeout(currentTimeout, maxTimeout);
+ }
+
+ /**
+ * @param timeout Timeout.
+ * @param maxTimeout Maximum timeout for backoff function.
+ * @return Next exponential backoff timeout.
+ */
+ public static int backoffTimeout(int timeout, int maxTimeout) {
+ return (int) Math.min(timeout * DEFAULT_BACKOFF_COEFFICIENT, maxTimeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(ExponentialBackoffTimeoutStrategy.class, this);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/NoopTimeoutStrategy.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/NoopTimeoutStrategy.java
new file mode 100644
index 0000000..3a655bf
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/NoopTimeoutStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+public class NoopTimeoutStrategy implements TimeoutStrategy {
+ @Override
+ public int nextTimeout(int currentTimeout, long round) {
+ return currentTimeout;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/TimeoutStrategy.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/TimeoutStrategy.java
new file mode 100644
index 0000000..6c2e8b8
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/TimeoutStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+/**
+ * Strategy to calculate next timeout.
+ */
+public interface TimeoutStrategy {
+
+ /**
+ * Get next timeout.
+ *
+ * @param currentTimeout Current timeout.
+ * @param round Round of getting next timeout.
+ * @return Next timeout.
+ */
+ int nextTimeout(int currentTimeout, long round);
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 7e07c49..d783be2 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -39,6 +39,7 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
@@ -234,6 +235,8 @@ public class TestCluster {
nodeOptions.setTimerPoolSize(Utils.cpus() * 2);
nodeOptions.setRpcProcessorThreadPoolSize(Utils.cpus() * 3);
+ nodeOptions.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy());
+
MockStateMachine fsm = new MockStateMachine(listenAddr);
nodeOptions.setFsm(fsm);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategyTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategyTest.java
new file mode 100644
index 0000000..be37a9b
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/ExponentialBackoffTimeoutStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link ExponentialBackoffTimeoutStrategy}.
+ */
+public class ExponentialBackoffTimeoutStrategyTest {
+
+ @Test
+ public void testNextReset() {
+ int initialTimeout = 2000;
+
+ int maxTimeout = 11_000;
+
+ int roundsWithoutAdjusting = 2;
+
+ TimeoutStrategy timeoutStrategy = new ExponentialBackoffTimeoutStrategy(maxTimeout, roundsWithoutAdjusting);
+
+ assertEquals(initialTimeout, timeoutStrategy.nextTimeout(initialTimeout, 1));
+ assertEquals(initialTimeout, timeoutStrategy.nextTimeout(initialTimeout, 2));
+
+ // default backoff coefficient equals to 2
+ assertEquals(2 * initialTimeout, timeoutStrategy.nextTimeout(initialTimeout, 3));
+ assertEquals(2 * initialTimeout, timeoutStrategy.nextTimeout(initialTimeout, 3));
+
+ assertEquals(2 * initialTimeout, timeoutStrategy.nextTimeout(initialTimeout, 4));
+
+ assertEquals(2 * 2 * initialTimeout, timeoutStrategy.nextTimeout(2 * initialTimeout, 4));
+
+ assertEquals(maxTimeout, timeoutStrategy.nextTimeout(2 * 2 * initialTimeout, 4));
+
+ assertEquals(maxTimeout, timeoutStrategy.nextTimeout(100_500, 4));
+ }
+}