You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/03/14 15:59:27 UTC

[ignite-3] branch main updated: IGNITE-16678 RaftGroupService#transferLeadership fixed.

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7566234  IGNITE-16678 RaftGroupService#transferLeadership fixed.
7566234 is described below

commit 75662340e98bdabb3ce229ab242c7a3b8dc7cdea
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Mon Mar 14 18:59:21 2022 +0300

    IGNITE-16678 RaftGroupService#transferLeadership fixed.
    
    IGNITE-16678 RaftGroupService#transferLeadership fixed.
---
 .../internal/raft/ItRaftGroupServiceTest.java      | 162 +++++++++++++++++++++
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |   9 +-
 .../raft/jraft/core/RaftGroupServiceTest.java      |   3 +-
 3 files changed, 170 insertions(+), 4 deletions(-)

diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
new file mode 100644
index 0000000..521ad2f
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration test methods of raft group service.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class ItRaftGroupServiceTest {
+    @WorkDirectory
+    private static Path workDir;
+
+    private static final int NODES_CNT = 2;
+
+    private static final int NODE_PORT_BASE = 20_000;
+
+    private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+
+    private static final String RAFT_GROUP_NAME = "part1";
+
+    private static List<ClusterService> clusterServices = new ArrayList<>();
+
+    private static List<Loza> raftSrvs = new ArrayList<>();
+
+    private static Map<ClusterNode, RaftGroupService> raftGroups = new HashMap<>();
+
+    @BeforeAll
+    public static void beforeAll(TestInfo testInfo) throws Exception {
+        List<NetworkAddress> localAddresses = findLocalAddresses(NODE_PORT_BASE,
+                NODE_PORT_BASE + NODES_CNT);
+
+        var nodeFinder = new StaticNodeFinder(localAddresses);
+
+        for (int i = 0; i < NODES_CNT; i++) {
+            ClusterService clusterService = ClusterServiceTestUtils.clusterService(
+                    testInfo,
+                    NODE_PORT_BASE + i,
+                    nodeFinder,
+                    NETWORK_FACTORY
+            );
+
+            clusterServices.add(clusterService);
+
+            clusterService.start();
+        }
+
+        assertTrue(waitForTopology(clusterServices.get(NODES_CNT - 1), NODES_CNT, 1000));
+
+        List<ClusterNode> nodes = clusterServices.stream().map(cs -> cs.topologyService().localMember()).collect(Collectors.toList());
+
+        CompletableFuture<RaftGroupService>[] svcFutures = new CompletableFuture[NODES_CNT];
+
+        for (int i = 0; i < NODES_CNT; i++) {
+            Loza raftServer = new Loza(clusterServices.get(i), workDir);
+
+            raftSrvs.add(raftServer);
+
+            raftServer.start();
+
+            CompletableFuture<RaftGroupService> raftGroupServiceFuture = raftSrvs.get(i).prepareRaftGroup(
+                    RAFT_GROUP_NAME,
+                    nodes,
+                    () -> mock(RaftGroupListener.class)
+            );
+
+            svcFutures[i] = raftGroupServiceFuture;
+        }
+
+        CompletableFuture.allOf(svcFutures).get();
+
+        for (int i = 0; i < NODES_CNT; i++) {
+            raftGroups.put(clusterServices.get(i).topologyService().localMember(), svcFutures[i].get());
+        }
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        raftGroups.values().forEach(RaftGroupService::shutdown);
+
+        for (Loza raftSrv : raftSrvs) {
+            raftSrv.stopRaftGroup(RAFT_GROUP_NAME);
+            raftSrv.stop();
+        }
+
+        clusterServices.stream().forEach(ClusterService::stop);
+    }
+
+    @Test
+    @Timeout(20)
+    public void testTransferLeadership() throws Exception {
+        RaftGroupService raftGroupService = raftGroups.get(clusterServices.get(0).topologyService().localMember());
+
+        while (raftGroupService.leader() == null) {
+            raftGroupService.refreshLeader().get();
+        }
+
+        ClusterNode oldLeaderNode = raftGroups.keySet().stream()
+                .filter(clusterNode -> new Peer(clusterNode.address()).equals(raftGroupService.leader())).findFirst().get();
+
+        ClusterNode newLeaderNode = raftGroups.keySet().stream()
+                .filter(clusterNode -> !new Peer(clusterNode.address()).equals(raftGroupService.leader())).findFirst().get();
+
+        Peer expectedNewLeaderPeer = new Peer(newLeaderNode.address());
+
+        raftGroups.get(oldLeaderNode).transferLeadership(expectedNewLeaderPeer).get();
+
+        assertTrue(waitForCondition(() -> expectedNewLeaderPeer.equals(raftGroups.get(oldLeaderNode).leader()), 10_000));
+
+        assertTrue(waitForCondition(() -> {
+            raftGroups.get(newLeaderNode).refreshLeader().join();
+            return expectedNewLeaderPeer.equals(raftGroups.get(newLeaderNode).leader());
+        }, 10_000));
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 42b1488..ed314f0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -422,16 +422,19 @@ public class RaftGroupServiceImpl implements RaftGroupService {
             return refreshLeader().thenCompose(res -> transferLeadership(newLeader));
 
         TransferLeaderRequest req = factory.transferLeaderRequest()
-            .groupId(groupId).leaderId(PeerId.fromPeer(newLeader).toString()).build();
+                .groupId(groupId)
+                .leaderId(PeerId.fromPeer(leader).toString())
+                .peerId(PeerId.fromPeer(newLeader).toString())
+                .build();
 
-        CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(newLeader.address(), req, rpcTimeout);
+        CompletableFuture<NetworkMessage> fut = cluster.messagingService().invoke(leader.address(), req, rpcTimeout);
 
         return fut.thenCompose(resp -> {
             if (resp != null) {
                 RpcRequests.ErrorResponse resp0 = (RpcRequests.ErrorResponse) resp;
 
                 if (resp0.errorCode() != RaftError.SUCCESS.getNumber())
-                    CompletableFuture.failedFuture(
+                    return CompletableFuture.failedFuture(
                         new RaftException(
                             RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()
                         )
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
index 603fb9a..9202138 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
@@ -630,7 +630,8 @@ public class RaftGroupServiceTest {
 
         when(messagingService.invoke(any(NetworkAddress.class),
             eq(FACTORY.transferLeaderRequest()
-                .leaderId(PeerId.fromPeer(NODES.get(1)).toString())
+                .peerId(PeerId.fromPeer(NODES.get(1)).toString())
+                .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
                 .groupId(groupId).build()), anyLong()))
             .then(invocation ->
                 completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY, Status.OK())));