You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/24 13:45:33 UTC

[ignite-3] branch main updated: IGNITE-17941 Add learners to Raft API (#1236)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a8a8bf3c45 IGNITE-17941 Add learners to Raft API (#1236)
a8a8bf3c45 is described below

commit a8a8bf3c45554a0e7929d4505be93857c80db967
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Oct 24 16:45:27 2022 +0300

    IGNITE-17941 Add learners to Raft API (#1236)
---
 .../management/ClusterManagementGroupManager.java  |   1 +
 .../CompletableFutureExceptionMatcher.java         | 102 ++++++
 .../matchers/CompletableFutureMatcher.java         |   2 +-
 .../raft/client/service/RaftGroupService.java      |  16 +-
 .../ignite/internal/raft/ItLearnersTest.java       | 355 +++++++++++++++++++++
 .../java/org/apache/ignite/internal/raft/Loza.java | 134 ++++----
 .../ignite/internal/raft/server/RaftServer.java    |  25 +-
 .../internal/raft/server/impl/JraftServerImpl.java |  15 +-
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  22 +-
 .../ignite/raft/jraft/rpc/AbstractRpcTest.java     |   1 +
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |   1 +
 .../{jraft/rpc => messages}/TestMessageGroup.java  |   4 +-
 .../internal/raft/server/impl/RaftServerImpl.java  |   5 +-
 13 files changed, 572 insertions(+), 111 deletions(-)

diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index ee0eaba79b..040928be05 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -472,6 +472,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                     .prepareRaftGroup(
                             INSTANCE,
                             resolveNodes(clusterService, nodeNames),
+                            List.of(),
                             () -> {
                                 clusterStateStorage.start();
 
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
new file mode 100644
index 0000000000..16df6f740d
--- /dev/null
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link Matcher} that awaits for the given future to complete exceptionally and the forwards the exception to the nested matcher.
+ */
+public class CompletableFutureExceptionMatcher extends TypeSafeMatcher<CompletableFuture<?>> {
+    /** Timeout in seconds. */
+    private static final int TIMEOUT_SECONDS = 30;
+
+    /** Matcher to forward the exception of the completable future. */
+    private final Matcher<? extends Exception> matcher;
+
+    /**
+     * Constructor.
+     *
+     * @param matcher Matcher to forward the exception of the completable future.
+     */
+    private CompletableFutureExceptionMatcher(Matcher<? extends Exception> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    protected boolean matchesSafely(CompletableFuture<?> item) {
+        try {
+            item.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            return false;
+        } catch (Exception e) {
+            return matcher.matches(unwrapException(e));
+        }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("a future that completes with an exception that ").appendDescriptionOf(matcher);
+    }
+
+    @Override
+    protected void describeMismatchSafely(CompletableFuture<?> item, Description mismatchDescription) {
+        if (item.isCompletedExceptionally()) {
+            try {
+                item.join();
+            } catch (Exception e) {
+                mismatchDescription.appendText("was completed exceptionally with ").appendValue(unwrapException(e));
+            }
+        } else if (item.isDone()) {
+            mismatchDescription.appendText("was completed successfully");
+        } else {
+            mismatchDescription.appendText("was not completed");
+        }
+    }
+
+    private static Throwable unwrapException(Exception e) {
+        if (e instanceof ExecutionException || e instanceof CompletionException) {
+            return e.getCause();
+        } else {
+            return e;
+        }
+    }
+
+    /**
+     * Creates a matcher that matches a future that completes exceptionally and the exception matches the nested matcher.
+     */
+    public static CompletableFutureExceptionMatcher willThrow(Matcher<? extends Exception> matcher) {
+        return new CompletableFutureExceptionMatcher(matcher);
+    }
+
+    /**
+     * Creates a matcher that matches a future that completes with an exception of the provided type.
+     */
+    public static CompletableFutureExceptionMatcher willThrow(Class<? extends Exception> cls) {
+        return willThrow(is(instanceOf(cls)));
+    }
+}
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index e294fb524b..d5bab3a906 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -81,7 +81,7 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
     }
 
     /**
-     * A shorter version of {@link #willBe} to be used with some matchers for aesthetical reasons.
+     * A shorter version of {@link #willBe} to be used with some matchers for aesthetic reasons.
      */
     public static <T> CompletableFutureMatcher<T> will(Matcher<T> matcher) {
         return willBe(matcher);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
index d204d7dfd0..e4291b1ddd 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
@@ -25,8 +25,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -51,7 +49,7 @@ public interface RaftGroupService {
     /**
      * Returns group id.
      */
-    @NotNull ReplicationGroupId groupId();
+    ReplicationGroupId groupId();
 
     /**
      * Returns default timeout for the operations in milliseconds.
@@ -240,18 +238,6 @@ public interface RaftGroupService {
      */
     <R> CompletableFuture<R> run(Command cmd);
 
-    /**
-     * Runs a read command on a given peer.
-     *
-     * <p>Read commands can see stale data (in the past).
-     *
-     * @param peer Peer id.
-     * @param cmd  The command.
-     * @param <R>  Execution result type.
-     * @return A future with the execution result.
-     */
-    <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd);
-
     /**
      * Shutdown and cleanup resources for this instance.
      */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
new file mode 100644
index 0000000000..065f1cfa29
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.raft.server.RaftGroupEventsListener.noopLsnr;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for the Raft Learner functionality.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ItLearnersTest extends IgniteAbstractTest {
+    private static final ReplicationGroupId RAFT_GROUP_ID = new ReplicationGroupId() {
+        @Override
+        public String toString() {
+            return "test";
+        }
+    };
+
+    private static final List<NetworkAddress> ADDRS = List.of(
+            new NetworkAddress("localhost", 5000),
+            new NetworkAddress("localhost", 5001),
+            new NetworkAddress("localhost", 5002)
+    );
+
+    private static final TestRaftMessagesFactory MESSAGES_FACTORY = new TestRaftMessagesFactory();
+
+    @InjectConfiguration
+    private static RaftConfiguration raftConfiguration;
+
+    private final List<RaftNode> nodes = new ArrayList<>(ADDRS.size());
+
+    /**
+     * Test WriteCommand.
+     */
+    @Transferable(10)
+    public interface TestWriteCommand extends NetworkMessage, WriteCommand {
+        String value();
+
+        static TestWriteCommand create(String value) {
+            return MESSAGES_FACTORY.testWriteCommand().value(value).build();
+        }
+    }
+
+    /** Mock Raft node. */
+    private class RaftNode implements AutoCloseable {
+        final ClusterService clusterService;
+
+        final Loza loza;
+
+        RaftNode(ClusterService clusterService) {
+            this.clusterService = clusterService;
+
+            Path raftDir = workDir.resolve(clusterService.localConfiguration().getName());
+
+            loza = new Loza(clusterService, raftConfiguration, raftDir, new HybridClock());
+        }
+
+        ClusterNode localMember() {
+            return clusterService.topologyService().localMember();
+        }
+
+        Peer asPeer() {
+            return new Peer(localMember().address());
+        }
+
+        void start() {
+            clusterService.start();
+            loza.start();
+        }
+
+        @Override
+        public void close() throws Exception {
+            closeAll(
+                    loza == null ? null : () -> loza.stopRaftGroup(RAFT_GROUP_ID),
+                    loza == null ? null : loza::beforeNodeStop,
+                    clusterService == null ? null : clusterService::beforeNodeStop,
+                    loza == null ? null : loza::stop,
+                    clusterService == null ? null : clusterService::stop
+            );
+        }
+    }
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        var nodeFinder = new StaticNodeFinder(ADDRS);
+
+        ADDRS.stream()
+                .map(addr -> clusterService(testInfo, addr.port(), nodeFinder))
+                .map(RaftNode::new)
+                .forEach(nodes::add);
+
+        nodes.parallelStream().forEach(RaftNode::start);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closeAll(nodes);
+    }
+
+    /**
+     * Tests that it is possible to replicate and read data from learners.
+     */
+    @Test
+    public void testReadWriteLearners() throws Exception {
+        List<TestRaftGroupListener> listeners = IntStream.range(0, nodes.size())
+                .mapToObj(i -> new TestRaftGroupListener())
+                .collect(toList());
+
+        RaftNode follower = nodes.get(0);
+        List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+        List<CompletableFuture<RaftGroupService>> services = IntStream.range(0, nodes.size())
+                .mapToObj(i -> startRaftGroup(nodes.get(i), listeners.get(i), List.of(follower), learners))
+                .collect(toList());
+
+        // Check that learners and peers have been set correctly.
+        services.forEach(service -> {
+            CompletableFuture<RaftGroupService> refreshMembers = service
+                    .thenCompose(s -> s.refreshMembers(true).thenApply(v -> s));
+
+            assertThat(refreshMembers.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+            assertThat(refreshMembers.thenApply(RaftGroupService::peers), will(contains(follower.asPeer())));
+            assertThat(refreshMembers.thenApply(RaftGroupService::learners), will(containsInAnyOrder(toPeerArray(learners))));
+        });
+
+        listeners.forEach(listener -> assertThat(listener.storage, is(empty())));
+
+        // Test writing data.
+        CompletableFuture<?> writeFuture = services.get(0)
+                .thenCompose(s -> s.run(TestWriteCommand.create("foo")).thenApply(v -> s))
+                .thenCompose(s -> s.run(TestWriteCommand.create("bar")));
+
+        assertThat(writeFuture, willCompleteSuccessfully());
+
+        for (TestRaftGroupListener listener : listeners) {
+            assertThat(listener.storage.poll(1, TimeUnit.SECONDS), is("foo"));
+            assertThat(listener.storage.poll(1, TimeUnit.SECONDS), is("bar"));
+        }
+    }
+
+    /**
+     * Tests {@link RaftGroupService#addLearners} functionality.
+     */
+    @Test
+    public void testAddLearners() {
+        RaftNode follower = nodes.get(0);
+        List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+        CompletableFuture<RaftGroupService> service1 =
+                startRaftGroup(follower, new TestRaftGroupListener(), List.of(follower), List.of());
+
+        assertThat(service1.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+        assertThat(service1.thenApply(RaftGroupService::learners), willBe(empty()));
+
+        CompletableFuture<Void> addLearners = service1
+                .thenCompose(s -> s.addLearners(Arrays.asList(toPeerArray(learners))));
+
+        assertThat(addLearners, willCompleteSuccessfully());
+
+        CompletableFuture<RaftGroupService> service2 =
+                startRaftGroup(nodes.get(1), new TestRaftGroupListener(), List.of(follower), learners);
+
+        // Check that learners and peers have been set correctly.
+        Stream.of(service1, service2).forEach(service -> {
+            CompletableFuture<RaftGroupService> refreshMembers = service
+                    .thenCompose(s -> s.refreshMembers(true).thenApply(v -> s));
+
+            assertThat(refreshMembers.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+            assertThat(refreshMembers.thenApply(RaftGroupService::peers), will(contains(follower.asPeer())));
+            assertThat(refreshMembers.thenApply(RaftGroupService::learners), will(containsInAnyOrder(toPeerArray(learners))));
+        });
+    }
+
+    /**
+     * Tests that if the only follower is stopped, then the majority is lost.
+     */
+    @Test
+    public void testLostLeadership() throws Exception {
+        RaftNode follower = nodes.get(0);
+        List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+        List<CompletableFuture<RaftGroupService>> services = nodes.stream()
+                .map(node -> startRaftGroup(node, new TestRaftGroupListener(), List.of(follower), learners))
+                .collect(toList());
+
+        // Wait for the leader to be elected.
+        services.forEach(service -> assertThat(
+                service.thenCompose(s -> s.refreshLeader().thenApply(v -> s.leader())),
+                willBe(follower.asPeer()))
+        );
+
+        nodes.set(0, null).close();
+
+        assertThat(services.get(1).thenCompose(s -> s.run(TestWriteCommand.create("foo"))), willThrow(TimeoutException.class));
+    }
+
+    /**
+     * Tests that even if all learners are stopped, then the majority is not lost.
+     */
+    @Test
+    public void testLostLearners() throws Exception {
+        RaftNode follower = nodes.get(0);
+        List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+        List<CompletableFuture<RaftGroupService>> services = nodes.stream()
+                .map(node -> startRaftGroup(node, new TestRaftGroupListener(), List.of(follower), learners))
+                .collect(toList());
+
+        // Wait for the leader to be elected.
+        services.forEach(service -> assertThat(
+                service.thenCompose(s -> s.refreshLeader().thenApply(v -> s.leader())),
+                willBe(follower.asPeer()))
+        );
+
+        nodes.set(1, null).close();
+        nodes.set(2, null).close();
+
+        assertThat(services.get(0).thenCompose(RaftGroupService::refreshLeader), willCompleteSuccessfully());
+    }
+
+    private CompletableFuture<RaftGroupService> startRaftGroup(
+            RaftNode raftNode,
+            RaftGroupListener listener,
+            List<RaftNode> peers,
+            List<RaftNode> learners
+    ) {
+        try {
+            CompletableFuture<RaftGroupService> future = raftNode.loza.prepareRaftGroup(
+                    RAFT_GROUP_ID,
+                    peers.stream().map(RaftNode::localMember).collect(toList()),
+                    learners.stream().map(RaftNode::localMember).collect(toList()),
+                    () -> listener,
+                    () -> noopLsnr,
+                    RaftGroupOptions.defaults()
+            );
+
+            return future.thenApply(s -> {
+                // Decrease the default timeout to make tests faster.
+                s.timeout(100);
+
+                return s;
+            });
+        } catch (NodeStoppingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class TestRaftGroupListener implements RaftGroupListener {
+        final BlockingQueue<String> storage = new LinkedBlockingQueue<>();
+
+        @Override
+        public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+            iterator.forEachRemaining(closure -> {
+                assertThat(closure.command(), is(instanceOf(TestWriteCommand.class)));
+
+                TestWriteCommand writeCommand = (TestWriteCommand) closure.command();
+
+                if (!storage.contains(writeCommand.value())) {
+                    storage.add(writeCommand.value());
+                }
+
+                closure.result(null);
+            });
+        }
+
+        @Override
+        public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        }
+
+        @Override
+        public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+        }
+
+        @Override
+        public boolean onSnapshotLoad(Path path) {
+            return true;
+        }
+
+        @Override
+        public void onShutdown() {
+        }
+    }
+
+    private static Peer[] toPeerArray(List<RaftNode> nodes) {
+        return nodes.stream().map(RaftNode::asPeer).toArray(Peer[]::new);
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 387b8008fa..9cb1dff244 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.raft;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupEventsListener.noopLsnr;
+
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
@@ -171,15 +173,16 @@ public class Loza implements IgniteComponent {
             Supplier<RaftGroupListener> lsnrSupplier,
             RaftGroupOptions groupOptions
     ) throws NodeStoppingException {
-        return prepareRaftGroup(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr, groupOptions);
+        return prepareRaftGroup(groupId, nodes, List.of(), lsnrSupplier, () -> noopLsnr, groupOptions);
     }
 
     /**
-     * Creates a raft group service providing operations on a raft group. If {@code nodes} contains the current node, then raft group starts
-     * on the current node.
+     * Creates a raft group service providing operations on a raft group. If {@code nodes} or {@code learnerNodes}
+     * contains the current node, then raft group starts on the current node.
      *
      * @param groupId Raft group id.
      * @param nodes Raft group nodes.
+     * @param learnerNodes Raft learner nodes.
      * @param lsnrSupplier Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
      * @param groupOptions Options to apply to the group.
@@ -189,6 +192,7 @@ public class Loza implements IgniteComponent {
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
             ReplicationGroupId groupId,
             List<ClusterNode> nodes,
+            List<ClusterNode> learnerNodes,
             Supplier<RaftGroupListener> lsnrSupplier,
             Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
             RaftGroupOptions groupOptions
@@ -198,7 +202,7 @@ public class Loza implements IgniteComponent {
         }
 
         try {
-            return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
+            return prepareRaftGroupInternal(groupId, nodes, learnerNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
         } finally {
             busyLock.leaveBusy();
         }
@@ -209,46 +213,28 @@ public class Loza implements IgniteComponent {
      *
      * @param groupId Raft group id.
      * @param nodes Raft group nodes.
+     * @param learnerNodes Raft learner nodes.
      * @param lsnrSupplier Raft group listener supplier.
-     * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+     * @param eventsLsnrSupplier Raft group events listener supplier.
      * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
      */
     private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
             ReplicationGroupId groupId,
             List<ClusterNode> nodes,
+            List<ClusterNode> learnerNodes,
             Supplier<RaftGroupListener> lsnrSupplier,
-            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+            Supplier<RaftGroupEventsListener> eventsLsnrSupplier,
             RaftGroupOptions groupOptions
     ) {
-        assert !nodes.isEmpty();
-
-        List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
-        boolean hasLocalRaft = shouldHaveRaftGroupLocally(nodes);
-
-        if (hasLocalRaft) {
-            LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
+        List<Peer> peers = nodesToPeers(nodes);
+        List<Peer> learners = nodesToPeers(learnerNodes);
 
-            if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
-                throw new IgniteInternalException(IgniteStringFormatter.format(
-                        "Raft group on the node is already started [raftGrp={}]",
-                        groupId
-                ));
-            }
+        if (shouldHaveRaftGroupLocally(nodes) || shouldHaveRaftGroupLocally(learnerNodes)) {
+            startRaftGroupNodeInternal(groupId, peers, learners, lsnrSupplier.get(), eventsLsnrSupplier.get(), groupOptions);
         }
 
-        return RaftGroupServiceImpl.start(
-                groupId,
-                clusterNetSvc,
-                FACTORY,
-                RETRY_TIMEOUT,
-                RPC_TIMEOUT,
-                peers,
-                true,
-                DELAY,
-                executor
-        );
+        return startRaftGroupServiceInternal(groupId, peers, learners);
     }
 
     /**
@@ -257,7 +243,7 @@ public class Loza implements IgniteComponent {
      * @param grpId Raft group id.
      * @param nodes Full set of raft group nodes.
      * @param lsnr Raft group listener.
-     * @param raftGrpEvtsLsnr Raft group events listener.
+     * @param eventsLsnr Raft group events listener.
      * @param groupOptions Options to apply to the group.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
@@ -265,26 +251,15 @@ public class Loza implements IgniteComponent {
             ReplicationGroupId grpId,
             Collection<ClusterNode> nodes,
             RaftGroupListener lsnr,
-            RaftGroupEventsListener raftGrpEvtsLsnr,
+            RaftGroupEventsListener eventsLsnr,
             RaftGroupOptions groupOptions
     ) throws NodeStoppingException {
-        assert !nodes.isEmpty();
-
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
-            LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
-
-            if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnr, lsnr, peers, groupOptions)) {
-                throw new IgniteInternalException(IgniteStringFormatter.format(
-                        "Raft group on the node is already started [raftGrp={}]",
-                        grpId
-                ));
-            }
+            startRaftGroupNodeInternal(grpId, nodesToPeers(nodes), List.of(), lsnr, eventsLsnr, groupOptions);
         } finally {
             busyLock.leaveBusy();
         }
@@ -306,25 +281,64 @@ public class Loza implements IgniteComponent {
             throw new NodeStoppingException();
         }
 
-        List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
         try {
-            return RaftGroupServiceImpl.start(
-                    grpId,
-                    clusterNetSvc,
-                    FACTORY,
-                    RETRY_TIMEOUT,
-                    RPC_TIMEOUT,
-                    peers,
-                    true,
-                    DELAY,
-                    executor
-            );
+            return startRaftGroupServiceInternal(grpId, nodesToPeers(nodes), List.of());
         } finally {
             busyLock.leaveBusy();
         }
     }
 
+    private void startRaftGroupNodeInternal(
+            ReplicationGroupId grpId,
+            List<Peer> peers,
+            List<Peer> learners,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener raftGrpEvtsLsnr,
+            RaftGroupOptions groupOptions
+    ) {
+        assert !peers.isEmpty();
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+        }
+
+        boolean started = raftServer.startRaftGroup(grpId, raftGrpEvtsLsnr, lsnr, peers, learners, groupOptions);
+
+        if (!started) {
+            throw new IgniteInternalException(IgniteStringFormatter.format(
+                    "Raft group on the node is already started [raftGrp={}]",
+                    grpId
+            ));
+        }
+    }
+
+    private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
+            ReplicationGroupId grpId,
+            List<Peer> peers,
+            List<Peer> learners
+    ) {
+        assert !peers.isEmpty();
+
+        return RaftGroupServiceImpl.start(
+                grpId,
+                clusterNetSvc,
+                FACTORY,
+                RETRY_TIMEOUT,
+                RPC_TIMEOUT,
+                peers,
+                learners,
+                true,
+                DELAY,
+                executor
+        );
+    }
+
+    private static List<Peer> nodesToPeers(Collection<ClusterNode> nodes) {
+        return nodes.stream()
+                .map(node -> new Peer(node.address()))
+                .collect(Collectors.toUnmodifiableList());
+    }
+
     /**
      * Stops a raft group on the current node.
      *
@@ -337,7 +351,9 @@ public class Loza implements IgniteComponent {
         }
 
         try {
-            LOG.info("Stop raft group={}", groupId);
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Stop raft group={}", groupId);
+            }
 
             raftServer.stopRaftGroup(groupId);
         } finally {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 8cade1e588..3e6ee72d60 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -41,21 +41,27 @@ public interface RaftServer extends IgniteComponent {
     /**
      * Starts a raft group bound to this cluster node.
      *
-     * @param groupId     Group id.
-     * @param lsnr        The listener.
-     * @param initialConf Inititial group configuration.
+     * @param groupId Group id.
+     * @param lsnr The listener.
+     * @param peers Peers configuration.
      * @param groupOptions Options to apply to the group.
      * @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
      */
-    boolean startRaftGroup(ReplicationGroupId groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
+    boolean startRaftGroup(
+            ReplicationGroupId groupId,
+            RaftGroupListener lsnr,
+            List<Peer> peers,
+            RaftGroupOptions groupOptions
+    );
 
     /**
      * Starts a raft group bound to this cluster node.
      *
-     * @param groupId     Group id.
-     * @param evLsnr      Listener for group membership and other events.
-     * @param lsnr        Listener for state machine events.
-     * @param initialConf Inititial group configuration.
+     * @param groupId Group id.
+     * @param evLsnr Listener for group membership and other events.
+     * @param lsnr Listener for state machine events.
+     * @param peers Peers configuration.
+     * @param learners Learners configuration.
      * @param groupOptions Options to apply to the group.
      * @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
      */
@@ -63,7 +69,8 @@ public interface RaftServer extends IgniteComponent {
             ReplicationGroupId groupId,
             RaftGroupEventsListener evLsnr,
             RaftGroupListener lsnr,
-            List<Peer> initialConf,
+            List<Peer> peers,
+            List<Peer> learners,
             RaftGroupOptions groupOptions
     );
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index f685423809..d3c008a7e2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -338,10 +338,10 @@ public class JraftServerImpl implements RaftServer {
     public boolean startRaftGroup(
             ReplicationGroupId groupId,
             RaftGroupListener lsnr,
-            @Nullable List<Peer> initialConf,
+            List<Peer> peers,
             RaftGroupOptions groupOptions
     ) {
-        return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf, groupOptions);
+        return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, peers, List.of(), groupOptions);
     }
 
     /** {@inheritDoc} */
@@ -350,7 +350,8 @@ public class JraftServerImpl implements RaftServer {
             ReplicationGroupId replicaGrpId,
             RaftGroupEventsListener evLsnr,
             RaftGroupListener lsnr,
-            @Nullable List<Peer> initialConf,
+            List<Peer> peers,
+            List<Peer> learners,
             RaftGroupOptions groupOptions
     ) {
         String grpId = replicaGrpId.toString();
@@ -403,11 +404,11 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setServiceFactory(serviceFactory);
 
-            if (initialConf != null) {
-                List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
+            List<PeerId> peerIds = peers.stream().map(PeerId::fromPeer).collect(Collectors.toList());
 
-                nodeOptions.setInitialConf(new Configuration(mapped, null));
-            }
+            List<PeerId> learnerIds = learners.stream().map(PeerId::fromPeer).collect(Collectors.toList());
+
+            nodeOptions.setInitialConf(new Configuration(peerIds, learnerIds));
 
             IgniteRpcClient client = new IgniteRpcClient(service);
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 79dbe0aef4..34cf4ebd28 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -40,7 +40,6 @@ import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -63,7 +62,6 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -133,13 +131,14 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         int timeout,
         int rpcTimeout,
         List<Peer> peers,
+        List<Peer> learners,
         Peer leader,
         long retryDelay,
         ScheduledExecutorService executor
     ) {
         this.cluster = requireNonNull(cluster);
-        this.peers = requireNonNull(peers);
-        this.learners = Collections.emptyList();
+        this.peers = List.copyOf(peers);
+        this.learners = List.copyOf(learners);
         this.factory = factory;
         this.timeout = timeout;
         this.rpcTimeout = rpcTimeout;
@@ -171,11 +170,12 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         int timeout,
         int rpcTimeout,
         List<Peer> peers,
+        List<Peer> learners,
         boolean getLeader,
         long retryDelay,
         ScheduledExecutorService executor
     ) {
-        var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, rpcTimeout, peers, null, retryDelay, executor);
+        var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, rpcTimeout, peers, learners, null, retryDelay, executor);
 
         if (!getLeader)
             return CompletableFuture.completedFuture(service);
@@ -219,7 +219,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         long retryDelay,
         ScheduledExecutorService executor
     ) {
-        return start(groupId, cluster, factory, timeout, timeout, peers, getLeader, retryDelay, executor);
+        return start(groupId, cluster, factory, timeout, timeout, peers, List.of(), getLeader, retryDelay, executor);
     }
 
     /** {@inheritDoc} */
@@ -524,16 +524,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         return fut.thenApply(resp -> (R) resp.result());
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
-        ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
-
-        return cluster.messagingService().invoke(peer.address(), req, rpcTimeout)
-                .thenApply(resp -> (R) ((ActionResponse) resp).result());
-    }
-
     /** {@inheritDoc} */
     @Override public void shutdown() {
         // No-op.
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index e8b778abcf..9d80ef3955 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.network.annotations.Transferable;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index fdbeb01411..8290720a49 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.rpc;
 
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.messages.TestMessageGroup;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java b/modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
similarity index 92%
rename from modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java
rename to modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
index 8803650407..6dbca8cdef 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.jraft.rpc;
+package org.apache.ignite.raft.messages;
 
 import org.apache.ignite.network.annotations.MessageGroup;
 
@@ -23,5 +23,5 @@ import org.apache.ignite.network.annotations.MessageGroup;
  * Message group for tests.
  */
 @MessageGroup(groupType = 4, groupName = "TestRaftMessages")
-public class TestMessageGroup {
+public interface TestMessageGroup {
 }
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index d1a6841fd9..2f890c225e 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -198,10 +198,11 @@ public class RaftServerImpl implements RaftServer {
             ReplicationGroupId groupId,
             RaftGroupEventsListener evLsnr,
             RaftGroupListener lsnr,
-            List<Peer> initialConf,
+            List<Peer> peers,
+            List<Peer> learners,
             RaftGroupOptions groupOptions
     ) {
-        return startRaftGroup(groupId, lsnr, initialConf, groupOptions);
+        return startRaftGroup(groupId, lsnr, peers, groupOptions);
     }
 
     /** {@inheritDoc} */