You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/24 13:45:33 UTC
[ignite-3] branch main updated: IGNITE-17941 Add learners to Raft API (#1236)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a8a8bf3c45 IGNITE-17941 Add learners to Raft API (#1236)
a8a8bf3c45 is described below
commit a8a8bf3c45554a0e7929d4505be93857c80db967
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Oct 24 16:45:27 2022 +0300
IGNITE-17941 Add learners to Raft API (#1236)
---
.../management/ClusterManagementGroupManager.java | 1 +
.../CompletableFutureExceptionMatcher.java | 102 ++++++
.../matchers/CompletableFutureMatcher.java | 2 +-
.../raft/client/service/RaftGroupService.java | 16 +-
.../ignite/internal/raft/ItLearnersTest.java | 355 +++++++++++++++++++++
.../java/org/apache/ignite/internal/raft/Loza.java | 134 ++++----
.../ignite/internal/raft/server/RaftServer.java | 25 +-
.../internal/raft/server/impl/JraftServerImpl.java | 15 +-
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 22 +-
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 1 +
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 1 +
.../{jraft/rpc => messages}/TestMessageGroup.java | 4 +-
.../internal/raft/server/impl/RaftServerImpl.java | 5 +-
13 files changed, 572 insertions(+), 111 deletions(-)
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index ee0eaba79b..040928be05 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -472,6 +472,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
.prepareRaftGroup(
INSTANCE,
resolveNodes(clusterService, nodeNames),
+ List.of(),
() -> {
clusterStateStorage.start();
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
new file mode 100644
index 0000000000..16df6f740d
--- /dev/null
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link Matcher} that awaits for the given future to complete exceptionally and the forwards the exception to the nested matcher.
+ */
+public class CompletableFutureExceptionMatcher extends TypeSafeMatcher<CompletableFuture<?>> {
+ /** Timeout in seconds. */
+ private static final int TIMEOUT_SECONDS = 30;
+
+ /** Matcher to forward the exception of the completable future. */
+ private final Matcher<? extends Exception> matcher;
+
+ /**
+ * Constructor.
+ *
+ * @param matcher Matcher to forward the exception of the completable future.
+ */
+ private CompletableFutureExceptionMatcher(Matcher<? extends Exception> matcher) {
+ this.matcher = matcher;
+ }
+
+ @Override
+ protected boolean matchesSafely(CompletableFuture<?> item) {
+ try {
+ item.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ return false;
+ } catch (Exception e) {
+ return matcher.matches(unwrapException(e));
+ }
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a future that completes with an exception that ").appendDescriptionOf(matcher);
+ }
+
+ @Override
+ protected void describeMismatchSafely(CompletableFuture<?> item, Description mismatchDescription) {
+ if (item.isCompletedExceptionally()) {
+ try {
+ item.join();
+ } catch (Exception e) {
+ mismatchDescription.appendText("was completed exceptionally with ").appendValue(unwrapException(e));
+ }
+ } else if (item.isDone()) {
+ mismatchDescription.appendText("was completed successfully");
+ } else {
+ mismatchDescription.appendText("was not completed");
+ }
+ }
+
+ private static Throwable unwrapException(Exception e) {
+ if (e instanceof ExecutionException || e instanceof CompletionException) {
+ return e.getCause();
+ } else {
+ return e;
+ }
+ }
+
+ /**
+ * Creates a matcher that matches a future that completes exceptionally and the exception matches the nested matcher.
+ */
+ public static CompletableFutureExceptionMatcher willThrow(Matcher<? extends Exception> matcher) {
+ return new CompletableFutureExceptionMatcher(matcher);
+ }
+
+ /**
+ * Creates a matcher that matches a future that completes with an exception of the provided type.
+ */
+ public static CompletableFutureExceptionMatcher willThrow(Class<? extends Exception> cls) {
+ return willThrow(is(instanceOf(cls)));
+ }
+}
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index e294fb524b..d5bab3a906 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -81,7 +81,7 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu
}
/**
- * A shorter version of {@link #willBe} to be used with some matchers for aesthetical reasons.
+ * A shorter version of {@link #willBe} to be used with some matchers for aesthetic reasons.
*/
public static <T> CompletableFutureMatcher<T> will(Matcher<T> matcher) {
return willBe(matcher);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
index d204d7dfd0..e4291b1ddd 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
@@ -25,8 +25,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -51,7 +49,7 @@ public interface RaftGroupService {
/**
* Returns group id.
*/
- @NotNull ReplicationGroupId groupId();
+ ReplicationGroupId groupId();
/**
* Returns default timeout for the operations in milliseconds.
@@ -240,18 +238,6 @@ public interface RaftGroupService {
*/
<R> CompletableFuture<R> run(Command cmd);
- /**
- * Runs a read command on a given peer.
- *
- * <p>Read commands can see stale data (in the past).
- *
- * @param peer Peer id.
- * @param cmd The command.
- * @param <R> Execution result type.
- * @return A future with the execution result.
- */
- <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd);
-
/**
* Shutdown and cleanup resources for this instance.
*/
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
new file mode 100644
index 0000000000..065f1cfa29
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.raft.server.RaftGroupEventsListener.noopLsnr;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for the Raft Learner functionality.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ItLearnersTest extends IgniteAbstractTest {
+ private static final ReplicationGroupId RAFT_GROUP_ID = new ReplicationGroupId() {
+ @Override
+ public String toString() {
+ return "test";
+ }
+ };
+
+ private static final List<NetworkAddress> ADDRS = List.of(
+ new NetworkAddress("localhost", 5000),
+ new NetworkAddress("localhost", 5001),
+ new NetworkAddress("localhost", 5002)
+ );
+
+ private static final TestRaftMessagesFactory MESSAGES_FACTORY = new TestRaftMessagesFactory();
+
+ @InjectConfiguration
+ private static RaftConfiguration raftConfiguration;
+
+ private final List<RaftNode> nodes = new ArrayList<>(ADDRS.size());
+
+ /**
+ * Test WriteCommand.
+ */
+ @Transferable(10)
+ public interface TestWriteCommand extends NetworkMessage, WriteCommand {
+ String value();
+
+ static TestWriteCommand create(String value) {
+ return MESSAGES_FACTORY.testWriteCommand().value(value).build();
+ }
+ }
+
+ /** Mock Raft node. */
+ private class RaftNode implements AutoCloseable {
+ final ClusterService clusterService;
+
+ final Loza loza;
+
+ RaftNode(ClusterService clusterService) {
+ this.clusterService = clusterService;
+
+ Path raftDir = workDir.resolve(clusterService.localConfiguration().getName());
+
+ loza = new Loza(clusterService, raftConfiguration, raftDir, new HybridClock());
+ }
+
+ ClusterNode localMember() {
+ return clusterService.topologyService().localMember();
+ }
+
+ Peer asPeer() {
+ return new Peer(localMember().address());
+ }
+
+ void start() {
+ clusterService.start();
+ loza.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeAll(
+ loza == null ? null : () -> loza.stopRaftGroup(RAFT_GROUP_ID),
+ loza == null ? null : loza::beforeNodeStop,
+ clusterService == null ? null : clusterService::beforeNodeStop,
+ loza == null ? null : loza::stop,
+ clusterService == null ? null : clusterService::stop
+ );
+ }
+ }
+
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ var nodeFinder = new StaticNodeFinder(ADDRS);
+
+ ADDRS.stream()
+ .map(addr -> clusterService(testInfo, addr.port(), nodeFinder))
+ .map(RaftNode::new)
+ .forEach(nodes::add);
+
+ nodes.parallelStream().forEach(RaftNode::start);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAll(nodes);
+ }
+
+ /**
+ * Tests that it is possible to replicate and read data from learners.
+ */
+ @Test
+ public void testReadWriteLearners() throws Exception {
+ List<TestRaftGroupListener> listeners = IntStream.range(0, nodes.size())
+ .mapToObj(i -> new TestRaftGroupListener())
+ .collect(toList());
+
+ RaftNode follower = nodes.get(0);
+ List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+ List<CompletableFuture<RaftGroupService>> services = IntStream.range(0, nodes.size())
+ .mapToObj(i -> startRaftGroup(nodes.get(i), listeners.get(i), List.of(follower), learners))
+ .collect(toList());
+
+ // Check that learners and peers have been set correctly.
+ services.forEach(service -> {
+ CompletableFuture<RaftGroupService> refreshMembers = service
+ .thenCompose(s -> s.refreshMembers(true).thenApply(v -> s));
+
+ assertThat(refreshMembers.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+ assertThat(refreshMembers.thenApply(RaftGroupService::peers), will(contains(follower.asPeer())));
+ assertThat(refreshMembers.thenApply(RaftGroupService::learners), will(containsInAnyOrder(toPeerArray(learners))));
+ });
+
+ listeners.forEach(listener -> assertThat(listener.storage, is(empty())));
+
+ // Test writing data.
+ CompletableFuture<?> writeFuture = services.get(0)
+ .thenCompose(s -> s.run(TestWriteCommand.create("foo")).thenApply(v -> s))
+ .thenCompose(s -> s.run(TestWriteCommand.create("bar")));
+
+ assertThat(writeFuture, willCompleteSuccessfully());
+
+ for (TestRaftGroupListener listener : listeners) {
+ assertThat(listener.storage.poll(1, TimeUnit.SECONDS), is("foo"));
+ assertThat(listener.storage.poll(1, TimeUnit.SECONDS), is("bar"));
+ }
+ }
+
+ /**
+ * Tests {@link RaftGroupService#addLearners} functionality.
+ */
+ @Test
+ public void testAddLearners() {
+ RaftNode follower = nodes.get(0);
+ List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+ CompletableFuture<RaftGroupService> service1 =
+ startRaftGroup(follower, new TestRaftGroupListener(), List.of(follower), List.of());
+
+ assertThat(service1.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+ assertThat(service1.thenApply(RaftGroupService::learners), willBe(empty()));
+
+ CompletableFuture<Void> addLearners = service1
+ .thenCompose(s -> s.addLearners(Arrays.asList(toPeerArray(learners))));
+
+ assertThat(addLearners, willCompleteSuccessfully());
+
+ CompletableFuture<RaftGroupService> service2 =
+ startRaftGroup(nodes.get(1), new TestRaftGroupListener(), List.of(follower), learners);
+
+ // Check that learners and peers have been set correctly.
+ Stream.of(service1, service2).forEach(service -> {
+ CompletableFuture<RaftGroupService> refreshMembers = service
+ .thenCompose(s -> s.refreshMembers(true).thenApply(v -> s));
+
+ assertThat(refreshMembers.thenApply(RaftGroupService::leader), willBe(follower.asPeer()));
+ assertThat(refreshMembers.thenApply(RaftGroupService::peers), will(contains(follower.asPeer())));
+ assertThat(refreshMembers.thenApply(RaftGroupService::learners), will(containsInAnyOrder(toPeerArray(learners))));
+ });
+ }
+
+ /**
+ * Tests that if the only follower is stopped, then the majority is lost.
+ */
+ @Test
+ public void testLostLeadership() throws Exception {
+ RaftNode follower = nodes.get(0);
+ List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+ List<CompletableFuture<RaftGroupService>> services = nodes.stream()
+ .map(node -> startRaftGroup(node, new TestRaftGroupListener(), List.of(follower), learners))
+ .collect(toList());
+
+ // Wait for the leader to be elected.
+ services.forEach(service -> assertThat(
+ service.thenCompose(s -> s.refreshLeader().thenApply(v -> s.leader())),
+ willBe(follower.asPeer()))
+ );
+
+ nodes.set(0, null).close();
+
+ assertThat(services.get(1).thenCompose(s -> s.run(TestWriteCommand.create("foo"))), willThrow(TimeoutException.class));
+ }
+
+ /**
+ * Tests that even if all learners are stopped, then the majority is not lost.
+ */
+ @Test
+ public void testLostLearners() throws Exception {
+ RaftNode follower = nodes.get(0);
+ List<RaftNode> learners = nodes.subList(1, nodes.size());
+
+ List<CompletableFuture<RaftGroupService>> services = nodes.stream()
+ .map(node -> startRaftGroup(node, new TestRaftGroupListener(), List.of(follower), learners))
+ .collect(toList());
+
+ // Wait for the leader to be elected.
+ services.forEach(service -> assertThat(
+ service.thenCompose(s -> s.refreshLeader().thenApply(v -> s.leader())),
+ willBe(follower.asPeer()))
+ );
+
+ nodes.set(1, null).close();
+ nodes.set(2, null).close();
+
+ assertThat(services.get(0).thenCompose(RaftGroupService::refreshLeader), willCompleteSuccessfully());
+ }
+
+ private CompletableFuture<RaftGroupService> startRaftGroup(
+ RaftNode raftNode,
+ RaftGroupListener listener,
+ List<RaftNode> peers,
+ List<RaftNode> learners
+ ) {
+ try {
+ CompletableFuture<RaftGroupService> future = raftNode.loza.prepareRaftGroup(
+ RAFT_GROUP_ID,
+ peers.stream().map(RaftNode::localMember).collect(toList()),
+ learners.stream().map(RaftNode::localMember).collect(toList()),
+ () -> listener,
+ () -> noopLsnr,
+ RaftGroupOptions.defaults()
+ );
+
+ return future.thenApply(s -> {
+ // Decrease the default timeout to make tests faster.
+ s.timeout(100);
+
+ return s;
+ });
+ } catch (NodeStoppingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class TestRaftGroupListener implements RaftGroupListener {
+ final BlockingQueue<String> storage = new LinkedBlockingQueue<>();
+
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ iterator.forEachRemaining(closure -> {
+ assertThat(closure.command(), is(instanceOf(TestWriteCommand.class)));
+
+ TestWriteCommand writeCommand = (TestWriteCommand) closure.command();
+
+ if (!storage.contains(writeCommand.value())) {
+ storage.add(writeCommand.value());
+ }
+
+ closure.result(null);
+ });
+ }
+
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ }
+
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+ }
+
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ return true;
+ }
+
+ @Override
+ public void onShutdown() {
+ }
+ }
+
+ private static Peer[] toPeerArray(List<RaftNode> nodes) {
+ return nodes.stream().map(RaftNode::asPeer).toArray(Peer[]::new);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 387b8008fa..9cb1dff244 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.raft;
+import static org.apache.ignite.internal.raft.server.RaftGroupEventsListener.noopLsnr;
+
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
@@ -171,15 +173,16 @@ public class Loza implements IgniteComponent {
Supplier<RaftGroupListener> lsnrSupplier,
RaftGroupOptions groupOptions
) throws NodeStoppingException {
- return prepareRaftGroup(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr, groupOptions);
+ return prepareRaftGroup(groupId, nodes, List.of(), lsnrSupplier, () -> noopLsnr, groupOptions);
}
/**
- * Creates a raft group service providing operations on a raft group. If {@code nodes} contains the current node, then raft group starts
- * on the current node.
+ * Creates a raft group service providing operations on a raft group. If {@code nodes} or {@code learnerNodes}
+ * contains the current node, then raft group starts on the current node.
*
* @param groupId Raft group id.
* @param nodes Raft group nodes.
+ * @param learnerNodes Raft learner nodes.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
* @param groupOptions Options to apply to the group.
@@ -189,6 +192,7 @@ public class Loza implements IgniteComponent {
public CompletableFuture<RaftGroupService> prepareRaftGroup(
ReplicationGroupId groupId,
List<ClusterNode> nodes,
+ List<ClusterNode> learnerNodes,
Supplier<RaftGroupListener> lsnrSupplier,
Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
RaftGroupOptions groupOptions
@@ -198,7 +202,7 @@ public class Loza implements IgniteComponent {
}
try {
- return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
+ return prepareRaftGroupInternal(groupId, nodes, learnerNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
} finally {
busyLock.leaveBusy();
}
@@ -209,46 +213,28 @@ public class Loza implements IgniteComponent {
*
* @param groupId Raft group id.
* @param nodes Raft group nodes.
+ * @param learnerNodes Raft learner nodes.
* @param lsnrSupplier Raft group listener supplier.
- * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+ * @param eventsLsnrSupplier Raft group events listener supplier.
* @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
ReplicationGroupId groupId,
List<ClusterNode> nodes,
+ List<ClusterNode> learnerNodes,
Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+ Supplier<RaftGroupEventsListener> eventsLsnrSupplier,
RaftGroupOptions groupOptions
) {
- assert !nodes.isEmpty();
-
- List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
- boolean hasLocalRaft = shouldHaveRaftGroupLocally(nodes);
-
- if (hasLocalRaft) {
- LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
+ List<Peer> peers = nodesToPeers(nodes);
+ List<Peer> learners = nodesToPeers(learnerNodes);
- if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
- throw new IgniteInternalException(IgniteStringFormatter.format(
- "Raft group on the node is already started [raftGrp={}]",
- groupId
- ));
- }
+ if (shouldHaveRaftGroupLocally(nodes) || shouldHaveRaftGroupLocally(learnerNodes)) {
+ startRaftGroupNodeInternal(groupId, peers, learners, lsnrSupplier.get(), eventsLsnrSupplier.get(), groupOptions);
}
- return RaftGroupServiceImpl.start(
- groupId,
- clusterNetSvc,
- FACTORY,
- RETRY_TIMEOUT,
- RPC_TIMEOUT,
- peers,
- true,
- DELAY,
- executor
- );
+ return startRaftGroupServiceInternal(groupId, peers, learners);
}
/**
@@ -257,7 +243,7 @@ public class Loza implements IgniteComponent {
* @param grpId Raft group id.
* @param nodes Full set of raft group nodes.
* @param lsnr Raft group listener.
- * @param raftGrpEvtsLsnr Raft group events listener.
+ * @param eventsLsnr Raft group events listener.
* @param groupOptions Options to apply to the group.
* @throws NodeStoppingException If node stopping intention was detected.
*/
@@ -265,26 +251,15 @@ public class Loza implements IgniteComponent {
ReplicationGroupId grpId,
Collection<ClusterNode> nodes,
RaftGroupListener lsnr,
- RaftGroupEventsListener raftGrpEvtsLsnr,
+ RaftGroupEventsListener eventsLsnr,
RaftGroupOptions groupOptions
) throws NodeStoppingException {
- assert !nodes.isEmpty();
-
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
- LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
-
- if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnr, lsnr, peers, groupOptions)) {
- throw new IgniteInternalException(IgniteStringFormatter.format(
- "Raft group on the node is already started [raftGrp={}]",
- grpId
- ));
- }
+ startRaftGroupNodeInternal(grpId, nodesToPeers(nodes), List.of(), lsnr, eventsLsnr, groupOptions);
} finally {
busyLock.leaveBusy();
}
@@ -306,25 +281,64 @@ public class Loza implements IgniteComponent {
throw new NodeStoppingException();
}
- List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
-
try {
- return RaftGroupServiceImpl.start(
- grpId,
- clusterNetSvc,
- FACTORY,
- RETRY_TIMEOUT,
- RPC_TIMEOUT,
- peers,
- true,
- DELAY,
- executor
- );
+ return startRaftGroupServiceInternal(grpId, nodesToPeers(nodes), List.of());
} finally {
busyLock.leaveBusy();
}
}
+ private void startRaftGroupNodeInternal(
+ ReplicationGroupId grpId,
+ List<Peer> peers,
+ List<Peer> learners,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener raftGrpEvtsLsnr,
+ RaftGroupOptions groupOptions
+ ) {
+ assert !peers.isEmpty();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
+ }
+
+ boolean started = raftServer.startRaftGroup(grpId, raftGrpEvtsLsnr, lsnr, peers, learners, groupOptions);
+
+ if (!started) {
+ throw new IgniteInternalException(IgniteStringFormatter.format(
+ "Raft group on the node is already started [raftGrp={}]",
+ grpId
+ ));
+ }
+ }
+
+ private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
+ ReplicationGroupId grpId,
+ List<Peer> peers,
+ List<Peer> learners
+ ) {
+ assert !peers.isEmpty();
+
+ return RaftGroupServiceImpl.start(
+ grpId,
+ clusterNetSvc,
+ FACTORY,
+ RETRY_TIMEOUT,
+ RPC_TIMEOUT,
+ peers,
+ learners,
+ true,
+ DELAY,
+ executor
+ );
+ }
+
+ private static List<Peer> nodesToPeers(Collection<ClusterNode> nodes) {
+ return nodes.stream()
+ .map(node -> new Peer(node.address()))
+ .collect(Collectors.toUnmodifiableList());
+ }
+
/**
* Stops a raft group on the current node.
*
@@ -337,7 +351,9 @@ public class Loza implements IgniteComponent {
}
try {
- LOG.info("Stop raft group={}", groupId);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stop raft group={}", groupId);
+ }
raftServer.stopRaftGroup(groupId);
} finally {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 8cade1e588..3e6ee72d60 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -41,21 +41,27 @@ public interface RaftServer extends IgniteComponent {
/**
* Starts a raft group bound to this cluster node.
*
- * @param groupId Group id.
- * @param lsnr The listener.
- * @param initialConf Inititial group configuration.
+ * @param groupId Group id.
+ * @param lsnr The listener.
+ * @param peers Peers configuration.
* @param groupOptions Options to apply to the group.
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
- boolean startRaftGroup(ReplicationGroupId groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
+ boolean startRaftGroup(
+ ReplicationGroupId groupId,
+ RaftGroupListener lsnr,
+ List<Peer> peers,
+ RaftGroupOptions groupOptions
+ );
/**
* Starts a raft group bound to this cluster node.
*
- * @param groupId Group id.
- * @param evLsnr Listener for group membership and other events.
- * @param lsnr Listener for state machine events.
- * @param initialConf Inititial group configuration.
+ * @param groupId Group id.
+ * @param evLsnr Listener for group membership and other events.
+ * @param lsnr Listener for state machine events.
+ * @param peers Peers configuration.
+ * @param learners Learners configuration.
* @param groupOptions Options to apply to the group.
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
@@ -63,7 +69,8 @@ public interface RaftServer extends IgniteComponent {
ReplicationGroupId groupId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
- List<Peer> initialConf,
+ List<Peer> peers,
+ List<Peer> learners,
RaftGroupOptions groupOptions
);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index f685423809..d3c008a7e2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -338,10 +338,10 @@ public class JraftServerImpl implements RaftServer {
public boolean startRaftGroup(
ReplicationGroupId groupId,
RaftGroupListener lsnr,
- @Nullable List<Peer> initialConf,
+ List<Peer> peers,
RaftGroupOptions groupOptions
) {
- return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf, groupOptions);
+ return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, peers, List.of(), groupOptions);
}
/** {@inheritDoc} */
@@ -350,7 +350,8 @@ public class JraftServerImpl implements RaftServer {
ReplicationGroupId replicaGrpId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
- @Nullable List<Peer> initialConf,
+ List<Peer> peers,
+ List<Peer> learners,
RaftGroupOptions groupOptions
) {
String grpId = replicaGrpId.toString();
@@ -403,11 +404,11 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setServiceFactory(serviceFactory);
- if (initialConf != null) {
- List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
+ List<PeerId> peerIds = peers.stream().map(PeerId::fromPeer).collect(Collectors.toList());
- nodeOptions.setInitialConf(new Configuration(mapped, null));
- }
+ List<PeerId> learnerIds = learners.stream().map(PeerId::fromPeer).collect(Collectors.toList());
+
+ nodeOptions.setInitialConf(new Configuration(peerIds, learnerIds));
IgniteRpcClient client = new IgniteRpcClient(service);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 79dbe0aef4..34cf4ebd28 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -40,7 +40,6 @@ import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -63,7 +62,6 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -133,13 +131,14 @@ public class RaftGroupServiceImpl implements RaftGroupService {
int timeout,
int rpcTimeout,
List<Peer> peers,
+ List<Peer> learners,
Peer leader,
long retryDelay,
ScheduledExecutorService executor
) {
this.cluster = requireNonNull(cluster);
- this.peers = requireNonNull(peers);
- this.learners = Collections.emptyList();
+ this.peers = List.copyOf(peers);
+ this.learners = List.copyOf(learners);
this.factory = factory;
this.timeout = timeout;
this.rpcTimeout = rpcTimeout;
@@ -171,11 +170,12 @@ public class RaftGroupServiceImpl implements RaftGroupService {
int timeout,
int rpcTimeout,
List<Peer> peers,
+ List<Peer> learners,
boolean getLeader,
long retryDelay,
ScheduledExecutorService executor
) {
- var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, rpcTimeout, peers, null, retryDelay, executor);
+ var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, rpcTimeout, peers, learners, null, retryDelay, executor);
if (!getLeader)
return CompletableFuture.completedFuture(service);
@@ -219,7 +219,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
long retryDelay,
ScheduledExecutorService executor
) {
- return start(groupId, cluster, factory, timeout, timeout, peers, getLeader, retryDelay, executor);
+ return start(groupId, cluster, factory, timeout, timeout, peers, List.of(), getLeader, retryDelay, executor);
}
/** {@inheritDoc} */
@@ -524,16 +524,6 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return fut.thenApply(resp -> (R) resp.result());
}
- /**
- * {@inheritDoc}
- */
- @Override public <R> CompletableFuture<R> run(Peer peer, ReadCommand cmd) {
- ActionRequest req = factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(false).build();
-
- return cluster.messagingService().invoke(peer.address(), req, rpcTimeout)
- .thenApply(resp -> (R) ((ActionResponse) resp).result());
- }
-
/** {@inheritDoc} */
@Override public void shutdown() {
// No-op.
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index e8b778abcf..9d80ef3955 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.apache.ignite.raft.messages.TestRaftMessagesFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index fdbeb01411..8290720a49 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.rpc;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.messages.TestMessageGroup;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java b/modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
similarity index 92%
rename from modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java
rename to modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
index 8803650407..6dbca8cdef 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/TestMessageGroup.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/messages/TestMessageGroup.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.rpc;
+package org.apache.ignite.raft.messages;
import org.apache.ignite.network.annotations.MessageGroup;
@@ -23,5 +23,5 @@ import org.apache.ignite.network.annotations.MessageGroup;
* Message group for tests.
*/
@MessageGroup(groupType = 4, groupName = "TestRaftMessages")
-public class TestMessageGroup {
+public interface TestMessageGroup {
}
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index d1a6841fd9..2f890c225e 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -198,10 +198,11 @@ public class RaftServerImpl implements RaftServer {
ReplicationGroupId groupId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
- List<Peer> initialConf,
+ List<Peer> peers,
+ List<Peer> learners,
RaftGroupOptions groupOptions
) {
- return startRaftGroup(groupId, lsnr, initialConf, groupOptions);
+ return startRaftGroup(groupId, lsnr, peers, groupOptions);
}
/** {@inheritDoc} */