You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/12/21 10:47:47 UTC

[ignite-3] 02/06: wip. fix leader await

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-18323
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 6ad693c5ba4cdcb99d8bbd5f0e797a3ca0644e1f
Author: amashenkov <an...@gmail.com>
AuthorDate: Tue Dec 20 11:47:30 2022 +0300

    wip. fix leader await
---
 .../ignite/internal/raft/RaftGroupServiceImpl.java | 158 ++++++++++++++-------
 1 file changed, 105 insertions(+), 53 deletions(-)

diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 659c79370f..cfd1817392 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.raft;
 
-import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.ThreadLocalRandom.current;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
@@ -44,6 +43,7 @@ import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
@@ -96,6 +97,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     @Nullable
     private volatile Peer leader;
 
+    private volatile CompletableFuture<Peer> leaderFuture;
+
     private volatile List<Peer> peers;
 
     private volatile List<Peer> learners;
@@ -253,10 +256,36 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
     @Override
     public CompletableFuture<Void> refreshLeader() {
+        return refreshLeader0().thenAccept(peer -> {
+        });
+    }
+
+    public CompletableFuture<Peer> refreshLeader0() {
+        CompletableFuture<Peer> fut;
+        synchronized (this) {
+            if (leaderFuture != null) {
+                return leaderFuture;
+            }
+
+            leader = null;
+            leaderFuture = new CompletableFuture<>();
+            fut = leaderFuture;
+        }
+
         GetLeaderRequest req = factory.getLeaderRequest().groupId(groupId).build();
 
-        return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
-                .thenAccept(resp -> this.leader = parsePeer(resp.leaderId()));
+        this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+                .whenComplete((resp, th) -> {
+                    if (th != null) {
+                        fut.completeExceptionally(th);
+                    } else {
+                        Peer peer = parsePeer(resp.leaderId());
+                        assert peer != null;
+                        onLeaderChanged(peer);
+                    }
+                });
+
+        return fut;
     }
 
     @Override
@@ -435,7 +464,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                 .build();
 
         return sendWithRetry(leader, req)
-                .thenRun(() -> this.leader = newLeader);
+                .thenRun(() -> onLeaderChanged(newLeader));
     }
 
     @Override
@@ -465,7 +494,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer peer, NetworkMessage req) {
         var future = new CompletableFuture<R>();
 
-        sendWithRetry(peer, req, currentTimeMillis() + timeout, future);
+        sendWithRetry(peer, req, Long.MAX_VALUE, future);
 
         return future;
     }
@@ -480,8 +509,18 @@ public class RaftGroupServiceImpl implements RaftGroupService {
      * @param <R> Response type.
      */
     private <R extends NetworkMessage> void sendWithRetry(Peer peer, NetworkMessage req, long stopTime, CompletableFuture<R> fut) {
-        if (currentTimeMillis() >= stopTime) {
-            fut.completeExceptionally(new TimeoutException());
+        if (cluster.isStopped()) {
+            fut.completeExceptionally(new NodeStoppingException());
+
+            return;
+        }
+
+        if (peer == null) {
+            if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+                scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+            } else {
+                refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
+            }
 
             return;
         }
@@ -505,8 +544,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                     } else if (resp instanceof SMErrorResponse) {
                         handleSmErrorResponse((SMErrorResponse) resp, fut);
                     } else {
-                        leader = peer; // The OK response was received from a leader.
-
                         fut.complete((R) resp);
                     }
                 });
@@ -516,12 +553,16 @@ public class RaftGroupServiceImpl implements RaftGroupService {
             Throwable err, Peer peer, NetworkMessage req, long stopTime, CompletableFuture<? extends NetworkMessage> fut
     ) {
         if (recoverable(err)) {
-            LOG.warn(
-                    "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
-                    err, req.getClass().getSimpleName()
+            LOG.info(
+                    "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): group={}",
+                    req.getClass().getSimpleName(), groupId
             );
 
-            scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime, fut));
+            if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+                scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+            } else {
+                refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
+            }
         } else {
             fut.completeExceptionally(err);
         }
@@ -534,28 +575,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
 
         switch (error) {
             case SUCCESS:
-                leader = peer; // The OK response was received from a leader.
-
                 fut.complete(null); // Void response.
 
                 break;
 
             case EBUSY:
             case EAGAIN:
-                scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
-
-                break;
-
-            case ENOENT:
-                scheduleRetry(() -> {
-                    // If changing peers or requesting a leader and something is not found
-                    // probably target peer is doing rebalancing, try another peer.
-                    if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
-                        sendWithRetry(randomNode(peer), req, stopTime, fut);
-                    } else {
-                        sendWithRetry(peer, req, stopTime, fut);
-                    }
-                });
+                if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+                    scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+                } else {
+                    scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
+                }
 
                 break;
 
@@ -563,12 +593,24 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                 // TODO: IGNITE-15706
             case UNKNOWN:
             case EINTERNAL:
-                if (resp.leaderId() == null) {
-                    scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime, fut));
-                } else {
-                    leader = parsePeer(resp.leaderId()); // Update a leader.
+                Peer leader0 = parsePeer(resp.leaderId());
+
+                if (leader0 != null) {
+                    onLeaderChanged(leader0);
+
+                    scheduleRetry(() -> sendWithRetry(leader0, req, stopTime, fut));
+
+                    break;
+                }
 
-                    scheduleRetry(() -> sendWithRetry(leader, req, stopTime, fut));
+                // fall-through
+            case ENOENT:
+                // If changing peers or requesting a leader and something is not found
+                // probably target peer is doing rebalancing, try another peer.
+                if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+                    scheduleRetry(() -> sendWithRetry(randomNode(), req, stopTime, fut));
+                } else {
+                    refreshLeader0().thenAcceptAsync(peer0 -> sendWithRetry(peer0, req, stopTime, fut), executor);
                 }
 
                 break;
@@ -580,6 +622,25 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         }
     }
 
+    private void onLeaderChanged(Peer leader) {
+        Objects.requireNonNull(leader);
+
+        CompletableFuture<Peer> leaderFuture0;
+
+        synchronized (this) {
+            this.leader = leader; // Update a leader.
+
+            if (leaderFuture == null) {
+                return;
+            }
+
+            leaderFuture0 = leaderFuture;
+            leaderFuture = null;
+        }
+
+        leaderFuture0.complete(leader);
+    }
+
     private static void handleSmErrorResponse(SMErrorResponse resp, CompletableFuture<? extends NetworkMessage> fut) {
         SMThrowable th = resp.error();
 
@@ -623,36 +684,23 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     }
 
     private Peer randomNode() {
-        return randomNode(null);
-    }
-
-    /**
-     * Returns a random peer. Tries 5 times finding a peer different from the excluded peer. If excluded peer is null, just returns a random
-     * peer.
-     *
-     * @param excludedPeer Excluded peer.
-     * @return Random peer.
-     */
-    private Peer randomNode(@Nullable Peer excludedPeer) {
         List<Peer> peers0 = peers;
 
         assert peers0 != null && !peers0.isEmpty();
 
-        int lastPeerIndex = excludedPeer == null ? -1 : peers0.indexOf(excludedPeer);
-
         ThreadLocalRandom random = current();
 
-        int newIdx = 0;
+        int startIdx = random.nextInt(peers0.size());
 
-        for (int retries = 0; retries < 5; retries++) {
-            newIdx = random.nextInt(peers0.size());
+        for (int i = 0; i < peers0.size(); i++) {
+            int idx = (startIdx + i) % peers0.size();
 
-            if (newIdx != lastPeerIndex) {
-                break;
+            if (cluster.topologyService().getByConsistentId(peers0.get(idx).consistentId()) != null) {
+                return peers0.get(idx);
             }
         }
 
-        return peers0.get(newIdx);
+        return null;
     }
 
     /**
@@ -662,6 +710,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
      * @return Peer
      */
     private static @Nullable Peer parsePeer(@Nullable String peerId) {
+        if (peerId == null) {
+            return null;
+        }
+
         PeerId id = PeerId.parsePeer(peerId);
 
         return id == null ? null : new Peer(id.getConsistentId());