You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "alievmirza (via GitHub)" <gi...@apache.org> on 2023/06/27 16:45:12 UTC

[GitHub] [ignite-3] alievmirza commented on a diff in pull request #2255: IGNITE-19120 Raft client should get leader metadata along while getting leader itself

alievmirza commented on code in PR #2255:
URL: https://github.com/apache/ignite-3/pull/2255#discussion_r1243939875


##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java:
##########
@@ -41,6 +42,14 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
      */
     PeerId getLeaderId();
 
+    /**
+     * Returns leader with their term concurrently.
+     * If the leader is not known, the method returns {@code null}.
+     *
+     * @return Leader peer with corresponding term.
+     */
+    IgniteBiTuple<PeerId, Long> getLeaderWithTer();

Review Comment:
   `getLeaderWithTerm`



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java:
##########
@@ -41,4 +41,14 @@ public LeaderWithTerm(@Nullable Peer leader, long term) {
     public long term() {
         return term;
     }
+
+    @Override
+    public String toString() {

Review Comment:
   Could we use `S.toString(LeaderWithTerm.class, this);` ?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -69,11 +69,8 @@ public class Replica {
     /** Instance of the local node. */
     private final ClusterNode localNode;
 
-    // TODO IGNITE-19120 after replica inoperability logic is introduced, this future should be replaced with something like
-    //     VersionedValue (so that PlacementDriverMessages would wait for new leader election)
-    private CompletableFuture<AtomicReference<ClusterNode>> leaderFuture = new CompletableFuture<>();
-
-    private AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+    // TODO:IGNITE-19120 Raft client should get leader metadata along while getting leader itself

Review Comment:
   todo with the current ticket number



##########
modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java:
##########
@@ -347,6 +356,83 @@ public void testNotificationToPlacementDriverAboutMajorityLoss() throws Exceptio
         stopReplicationGroup(GROUP_ID, grpNodes);
     }
 
+    @Test
+    @Disabled("IGNITE-19120 Raft client should get leader metadata along while getting leader itself")
+    public void testLeaseGrantWhenMajorityLoss() throws Exception {
+        Set<String> grpNodes = chooseRandomNodes(3);
+
+        log.info("Replication group is based on {}", grpNodes);
+        log.info("Placement driver driver is based on {}", placementDriverNodeNames);
+
+        var raftClientFut = createReplicationGroup(GROUP_ID, grpNodes);
+
+        var raftClient = raftClientFut.get();
+
+        raftClient.refreshLeader().get();
+
+        var leaderNodeName = raftClient.leader().consistentId();
+
+        var clusterService = clusterServices.get(randomPlacementDriverNode(Set.of()));
+
+        var leaseGrantMsgFut = clusterService.messagingService().invoke(
+                clusterService.topologyService().getByConsistentId(leaderNodeName),
+                PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
+                        .groupId(GROUP_ID)
+                        .leaseStartTimeLong(clock.nowLong())
+                        .leaseExpirationTimeLong(new HybridTimestamp(clock.now().getPhysical() + 10_000, 0).longValue())
+                        .build(),
+                2_000
+        );
+
+        assertThat(leaseGrantMsgFut, willCompleteSuccessfully());
+
+        LeaseGrantedMessageResponse leaseGrantResp = (LeaseGrantedMessageResponse) leaseGrantMsgFut.get();
+
+        assertTrue(leaseGrantResp.accepted());
+        assertNull(leaseGrantResp.redirectProposal());
+
+        var grpNodesToStop = grpNodes.stream().filter(n -> !n.equals(leaderNodeName)).collect(toSet());
+
+        log.info(
+                "All nodes of the replication group will be unavailable except leader [leader={}, others={}]",
+                leaderNodeName,
+                grpNodesToStop
+        );
+
+        for (String nodeToStop : grpNodesToStop) {
+            var srvc = clusterServices.get(nodeToStop);
+
+            srvc.beforeNodeStop();
+            srvc.stop();
+        }
+
+        clusterService = clusterServices.get(randomPlacementDriverNode(grpNodesToStop));
+
+        log.info(
+                "Placement driver node tries to prolong a lease [pdNode={}, grpLeader={}]",
+                clusterService.topologyService().localMember(),
+                leaderNodeName
+        );
+
+        var prolongLeaseFut = clusterService.messagingService().invoke(
+                clusterService.topologyService().getByConsistentId(leaderNodeName),
+                PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage()
+                        .groupId(GROUP_ID)
+                        .leaseStartTimeLong(clock.nowLong())
+                        .leaseExpirationTimeLong(new HybridTimestamp(clock.now().getPhysical() + 10_000, 0).longValue())
+                        .build(),
+                1_000
+        );
+
+        Thread.sleep(2_000);

Review Comment:
   I don't think that this is a good solution



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -227,12 +223,26 @@ private CompletableFuture<LeaseGrantedMessageResponse> acceptLease(
         return completedFuture(resp);
     }
 
-    private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(ClusterNode groupLeader) {
-        LOG.info("Proposing lease redirection, proposed node=" + groupLeader);
+    /**
+     * Checks this exception is caused of timeout or connectivity issue.
+     *
+     * @param ex An exception
+     * @return True if this exception has thrown due to timeout or connection problem, false otherwise.
+     */
+    private static boolean isConnectivityRelatedException(Throwable ex) {

Review Comment:
   this method is not used anywhere



##########
modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java:
##########
@@ -347,6 +356,83 @@ public void testNotificationToPlacementDriverAboutMajorityLoss() throws Exceptio
         stopReplicationGroup(GROUP_ID, grpNodes);
     }
 
+    @Test
+    @Disabled("IGNITE-19120 Raft client should get leader metadata along while getting leader itself")

Review Comment:
   why is this test disabled with the current ticket?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java:
##########
@@ -1739,6 +1742,53 @@ public void handleReadIndexRequest(final ReadIndexRequest request,
         }
     }
 
+    @Override
+    public void handleReadLeaderIndexRequest(
+            ReadLeaderMetadataRequest request,
+            RpcResponseClosure<ReadLeaderMetadataResponse> done
+    ) {
+        long startMs = Utils.monotonicMs();
+
+        this.readLock.lock();
+
+        try {
+            switch (this.state) {
+                case STATE_LEADER:
+                    readLeader(
+                            null,
+                            new RpcResponseClosureAdapter<>() {
+                                @Override
+                                public void run(Status status) {
+                                    if (getResponse() != null) {
+                                        done.setResponse(raftOptions.getRaftMessagesFactory().readLeaderMetadataResponse()
+                                                .leaderId(leaderId.toString())
+                                                .currentTerm(currTerm)
+                                                .index(getResponse().index())
+                                                .build());
+                                    }
+
+                                    done.run(status);
+                                }
+                            }
+                    );
+                    break;
+
+                case STATE_TRANSFERRING:
+                    done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
+                    break;
+
+                default:
+                    done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
+                    break;
+            }
+        }
+        finally {
+            this.readLock.unlock();
+            this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);

Review Comment:
   copy paste



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -177,35 +166,42 @@ public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage
     public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
         LOG.info("Received LeaseGrantedMessage for replica belonging to group=" + groupId() + ", force=" + msg.force());
 
-        return leaderFuture().thenCompose(leader -> {
-            HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
-
-            if (leaseExpirationTime != null) {
-                assert msg.leaseExpirationTime().after(leaseExpirationTime) : "Invalid lease expiration time in message, msg=" + msg;
-            }
+        return readyMajority.thenCompose(unused -> raftClient.readLeaderMetadata()).thenCompose(leaderMetadata -> {

Review Comment:
   what if a majority was lost on the moment of the call? 



##########
modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java:
##########
@@ -362,6 +448,22 @@ private String randomNode(Set<String> exceptNodes) {
         return list.get(0);
     }
 
+    /**
+     * Gets placement driver node name randomly.
+     *
+     * @param exceptNodes Nodes to skip.
+     * @return Node name.
+     */
+    private String randomPlacementDriverNode(Set<String> exceptNodes) {
+        ArrayList<String> list = new ArrayList<>(placementDriverNodeNames);
+
+        list.removeAll(exceptNodes);
+
+        Collections.shuffle(list);

Review Comment:
   why do we need to shuffle the whole list, if we can generate random index to get?  



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -177,35 +166,42 @@ public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage
     public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
         LOG.info("Received LeaseGrantedMessage for replica belonging to group=" + groupId() + ", force=" + msg.force());
 
-        return leaderFuture().thenCompose(leader -> {
-            HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
-
-            if (leaseExpirationTime != null) {
-                assert msg.leaseExpirationTime().after(leaseExpirationTime) : "Invalid lease expiration time in message, msg=" + msg;
-            }
+        return readyMajority.thenCompose(unused -> raftClient.readLeaderMetadata()).thenCompose(leaderMetadata -> {
+            assert leaseExpirationTime == null || msg.leaseExpirationTime().after(leaseExpirationTime) :
+                    "Invalid lease expiration time in message [leaseExpirationTime=" + leaseExpirationTime
+                            + ", msgLeaseExpirationTime=" + msg.leaseExpirationTime() + ']';
 
             if (msg.force()) {
                 // Replica must wait till storage index reaches the current leader's index to make sure that all updates made on the
                 // group leader are received.
-
-                return waitForActualState(msg.leaseExpirationTime().getPhysical())
+                return waitForActualState(leaderMetadata.getIndex())
                         .thenCompose(v -> {
+                            if (msg.leaseExpirationTime().getPhysical() < currentTimeMillis()) {

Review Comment:
   Is this the correct way to compare hybrid timestamp? I was expecting `HybridTimestamp#compareTo` with `clock.now`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org