You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/12/21 10:47:47 UTC
[ignite-3] 02/06: wip. fix leader await
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-18323
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 6ad693c5ba4cdcb99d8bbd5f0e797a3ca0644e1f
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Dec 20 11:47:30 2022 +0300
wip. fix leader await
---
.../ignite/internal/raft/RaftGroupServiceImpl.java | 158 ++++++++++++++-------
1 file changed, 105 insertions(+), 53 deletions(-)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 659c79370f..cfd1817392 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.raft;
-import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
@@ -44,6 +43,7 @@ import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
@@ -96,6 +97,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Nullable
private volatile Peer leader;
+ private volatile CompletableFuture<Peer> leaderFuture;
+
private volatile List<Peer> peers;
private volatile List<Peer> learners;
@@ -253,10 +256,36 @@ public class RaftGroupServiceImpl implements RaftGroupService {
@Override
public CompletableFuture<Void> refreshLeader() {
+ return refreshLeader0().thenAccept(peer -> {
+ });
+ }
+
+ public CompletableFuture<Peer> refreshLeader0() {
+ CompletableFuture<Peer> fut;
+ synchronized (this) {
+ if (leaderFuture != null) {
+ return leaderFuture;
+ }
+
+ leader = null;
+ leaderFuture = new CompletableFuture<>();
+ fut = leaderFuture;
+ }
+
GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
- return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
- .thenAccept(resp -> this.leader = parsePeer(resp.leaderId()));
+ this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+ .whenComplete((resp, th) -> {
+ if (th != null) {
+ fut.completeExceptionally(th);
+ } else {
+ Peer peer = parsePeer(resp.leaderId());
+ assert peer != null;
+ onLeaderChanged(peer);
+ }
+ });
+
+ return fut;
}
@Override
@@ -435,7 +464,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
.build();
return sendWithRetry(leader, req)
- .thenRun(() -> this.leader = newLeader);
+ .thenRun(() -> onLeaderChanged(newLeader));
}
@Override
@@ -465,7 +494,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer peer, NetworkMessage req) {
var future = new CompletableFuture<R>();
- sendWithRetry(peer, req, currentTimeMillis() + timeout, future);
+ sendWithRetry(peer, req, Long.MAX_VALUE, future);
return future;
}
@@ -480,8 +509,18 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param <R> Response type.
*/
private <R extends NetworkMessage> void sendWithRetry(Peer peer, NetworkMessage req, long stopTime, CompletableFuture<R> fut) {
- if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(new TimeoutException());
+ if (cluster.isStopped()) {
+ fut.completeExceptionally(new NodeStoppingException());
+
+ return;
+ }
+
+ if (peer == null) {
+ if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+ scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+ } else {
+ refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
+ }
return;
}
@@ -505,8 +544,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
} else if (resp instanceof SMErrorResponse) {
handleSmErrorResponse((SMErrorResponse) resp, fut);
} else {
- leader = peer; // The OK response was received from a leader.
-
fut.complete((R) resp);
}
});
@@ -516,12 +553,16 @@ public class RaftGroupServiceImpl implements RaftGroupService {
Throwable err, Peer peer, NetworkMessage req, long stopTime, CompletableFuture<? extends NetworkMessage> fut
) {
if (recoverable(err)) {
- LOG.warn(
- "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
- err, req.getClass().getSimpleName()
+ LOG.info(
+ "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): group={}",
+ req.getClass().getSimpleName(), groupId
);
- scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime, fut));
+ if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+ scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+ } else {
+ refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
+ }
} else {
fut.completeExceptionally(err);
}
@@ -534,28 +575,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
switch (error) {
case SUCCESS:
- leader = peer; // The OK response was received from a leader.
-
fut.complete(null); // Void response.
break;
case EBUSY:
case EAGAIN:
- scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
-
- break;
-
- case ENOENT:
- scheduleRetry(() -> {
- // If changing peers or requesting a leader and something is not found
- // probably target peer is doing rebalancing, try another peer.
- if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
- sendWithRetry(randomNode(peer), req, stopTime, fut);
- } else {
- sendWithRetry(peer, req, stopTime, fut);
- }
- });
+ if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+ scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+ } else {
+ scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
+ }
break;
@@ -563,12 +593,24 @@ public class RaftGroupServiceImpl implements RaftGroupService {
// TODO: IGNITE-15706
case UNKNOWN:
case EINTERNAL:
- if (resp.leaderId() == null) {
- scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime, fut));
- } else {
- leader = parsePeer(resp.leaderId()); // Update a leader.
+ Peer leader0 = parsePeer(resp.leaderId());
+
+ if (leader0 != null) {
+ onLeaderChanged(leader0);
+
+ scheduleRetry(() -> sendWithRetry(leader0, req, stopTime, fut));
+
+ break;
+ }
- scheduleRetry(() -> sendWithRetry(leader, req, stopTime, fut));
+ // fall-through
+ case ENOENT:
+ // If changing peers or requesting a leader and something is not found
+ // probably target peer is doing rebalancing, try another peer.
+ if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+ scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+ } else {
+ refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
}
break;
@@ -580,6 +622,25 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
}
+ private void onLeaderChanged(Peer leader) {
+ Objects.requireNonNull(leader);
+
+ CompletableFuture<Peer> leaderFuture0;
+
+ synchronized (this) {
+ this.leader = leader; // Update a leader.
+
+ if (leaderFuture == null) {
+ return;
+ }
+
+ leaderFuture0 = leaderFuture;
+ leaderFuture = null;
+ }
+
+ leaderFuture0.complete(leader);
+ }
+
private static void handleSmErrorResponse(SMErrorResponse resp, CompletableFuture<? extends NetworkMessage> fut) {
SMThrowable th = resp.error();
@@ -623,36 +684,23 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
private Peer randomNode() {
- return randomNode(null);
- }
-
- /**
- * Returns a random peer. Tries 5 times finding a peer different from the excluded peer. If excluded peer is null, just returns a random
- * peer.
- *
- * @param excludedPeer Excluded peer.
- * @return Random peer.
- */
- private Peer randomNode(@Nullable Peer excludedPeer) {
List<Peer> peers0 = peers;
assert peers0 != null && !peers0.isEmpty();
- int lastPeerIndex = excludedPeer == null ? -1 : peers0.indexOf(excludedPeer);
-
ThreadLocalRandom random = current();
- int newIdx = 0;
+ int startIdx = random.nextInt(peers0.size());
- for (int retries = 0; retries < 5; retries++) {
- newIdx = random.nextInt(peers0.size());
+ for (int i = 0; i < peers0.size(); i++) {
+ int idx = (startIdx + i) % peers0.size();
- if (newIdx != lastPeerIndex) {
- break;
+ if (cluster.topologyService().getByConsistentId(peers0.get(idx).consistentId()) != null) {
+ return peers0.get(idx);
}
}
- return peers0.get(newIdx);
+ return null;
}
/**
@@ -662,6 +710,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @return Peer
*/
private static @Nullable Peer parsePeer(@Nullable String peerId) {
+ if (peerId == null) {
+ return null;
+ }
+
PeerId id = PeerId.parsePeer(peerId);
return id == null ? null : new Peer(id.getConsistentId());