You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/03/27 08:04:31 UTC
[ignite-3] 01/01: GNITE-18744 Implement primary replica side leaseGrant handler (#1765)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9323ee9e1723a82ef0905e99dee34bc4141d56d7
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Mon Mar 27 10:42:09 2023 +0300
GNITE-18744 Implement primary replica side leaseGrant handler (#1765)
---
.../ignite/internal/hlc/HybridTimestamp.java | 42 ++
.../apache/ignite/internal/util/IgniteUtils.java | 50 ++
.../build.gradle | 11 +-
.../message/LeaseGrantedMessage.java} | 30 +-
.../message/LeaseGrantedMessageResponse.java} | 24 +-
.../message/PlacementDriverMessageGroup.java} | 32 +-
.../message/PlacementDriverReplicaMessage.java} | 31 +-
modules/placement-driver/build.gradle | 2 +
.../internal/placementdriver/ActiveActorTest.java | 592 ++++++++++++++++++++-
.../PlacementDriverManagerTest.java | 35 +-
.../internal/placementdriver/LeaseUpdater.java | 25 +-
.../placementdriver/PlacementDriverManager.java | 74 +--
.../apache/ignite/internal/raft/RaftManager.java | 33 ++
.../ignite/internal/raft/RaftServiceFactory.java | 45 ++
.../internal/raft/service/RaftGroupService.java | 7 +
.../java/org/apache/ignite/internal/raft/Loza.java | 83 ++-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 13 +
.../ignite/internal/raft/RaftGroupServiceTest.java | 36 ++
modules/replicator/build.gradle | 14 +-
.../raft/client/TopologyAwareRaftGroupService.java | 22 +-
.../TopologyAwareRaftGroupServiceFactory.java | 84 +++
.../apache/ignite/internal/replicator/Replica.java | 174 +++++-
.../ignite/internal/replicator/ReplicaManager.java | 181 ++++---
.../client/TopologyAwareRaftGroupServiceTest.java | 101 +---
.../replicator/PlacementDriverReplicaSideTest.java | 340 ++++++++++++
modules/rest-api/openapi/openapi.yaml | 4 +-
modules/runner/build.gradle | 1 +
.../storage/ItRebalanceDistributedTest.java | 14 +-
.../runner/app/ItIgniteNodeRestartTest.java | 12 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 28 +-
modules/sql-engine/build.gradle | 1 +
.../sql/engine/exec/MockedStructuresTest.java | 4 +-
.../ignite/distributed/ItTablePersistenceTest.java | 3 +-
.../distributed/ItTxDistributedTestSingleNode.java | 61 ++-
.../ignite/distributed/ReplicaUnavailableTest.java | 9 +-
.../internal/table/distributed/TableManager.java | 29 +-
.../table/distributed/raft/PartitionListener.java | 10 +-
.../replicator/PartitionReplicaListener.java | 13 +-
.../TableManagerDistributionZonesTest.java | 4 +-
.../table/distributed/TableManagerTest.java | 6 +-
.../raft/PartitionCommandListenerTest.java | 6 +-
.../table/impl/DummyInternalTableImpl.java | 3 +-
settings.gradle | 2 +
43 files changed, 1881 insertions(+), 410 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index d3c8415cc8..e06a975ae4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -34,6 +34,12 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
/** A constant holding the maximum value a {@code HybridTimestamp} can have. */
public static final HybridTimestamp MAX_VALUE = new HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);
+ /**
+ * Cluster cLock skew. The constant determines the undefined inclusive interval to compares timestamp from various nodes.
+ * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+ */
+ private static final long CLOCK_SKEW = 7L;
+
/** Physical clock. */
private final long physical;
@@ -131,6 +137,42 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
return result;
}
+ /**
+ * Compares two timestamps with the clock skew.
+ * t1, t2 comparable if t1 is not contained on [t2 - CLOCK_SKEW; t2 + CLOCK_SKEW].
+ * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+ *
+ * @param anotherTimestamp Another timestamp.
+ * @return Result of comparison can be positive or negative, or {@code 0} if timestamps are not comparable.
+ */
+ private int compareWithClockSkew(HybridTimestamp anotherTimestamp) {
+ if (getPhysical() - CLOCK_SKEW <= anotherTimestamp.getPhysical() && getPhysical() + CLOCK_SKEW >= anotherTimestamp.getPhysical()) {
+ return 0;
+ }
+
+ return compareTo(anotherTimestamp);
+ }
+
+ /**
+ * Defines whether this timestamp is strictly before the given one, taking the clock skew into account.
+ *
+ * @param anotherTimestamp Another timestamp.
+ * @return Whether this timestamp is before the given one or not.
+ */
+ public boolean before(HybridTimestamp anotherTimestamp) {
+ return compareWithClockSkew(anotherTimestamp) < 0;
+ }
+
+ /**
+ * Defines whether this timestamp is strictly after the given one, taking the clock skew into account.
+ *
+ * @param anotherTimestamp Another timestamp.
+ * @return Whether this timestamp is after the given one or not.
+ */
+ public boolean after(HybridTimestamp anotherTimestamp) {
+ return compareWithClockSkew(anotherTimestamp) > 0;
+ }
+
@Override
public int compareTo(HybridTimestamp other) {
if (this.physical == other.physical) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index efa3d4f15e..ea16c07854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -48,10 +48,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -1005,4 +1007,52 @@ public class IgniteUtils {
return Optional.empty();
}
+
+ /**
+ * Retries operation until it succeeds or fails with exception that is different than the given.
+ *
+ * @param operation Operation.
+ * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
+ * @param executor Executor to make retry in.
+ * @return Future that is completed when operation is successful or failed with other exception than the given.
+ */
+ public static <T> CompletableFuture<T> retryOperationUntilSuccess(
+ Supplier<CompletableFuture<T>> operation,
+ Function<Throwable, Boolean> stopRetryCondition,
+ Executor executor
+ ) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
+
+ retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor);
+
+ return fut;
+ }
+
+ /**
+ * Retries operation until it succeeds or fails with exception that is different than the given.
+ *
+ * @param operation Operation.
+ * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
+ * @param executor Executor to make retry in.
+ * @param fut Future that is completed when operation is successful or failed with other exception than the given.
+ */
+ public static <T> void retryOperationUntilSuccess(
+ Supplier<CompletableFuture<T>> operation,
+ Function<Throwable, Boolean> stopRetryCondition,
+ CompletableFuture<T> fut,
+ Executor executor
+ ) {
+ operation.get()
+ .whenComplete((res, e) -> {
+ if (e == null) {
+ fut.complete(res);
+ } else {
+ if (stopRetryCondition.apply(e)) {
+ fut.completeExceptionally(e);
+ } else {
+ executor.execute(() -> retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor));
+ }
+ }
+ });
+ }
}
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/build.gradle
similarity index 79%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/build.gradle
index 34b50e0c45..81779696ef 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -19,18 +19,11 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
-
dependencies {
annotationProcessor project(":ignite-network-annotation-processor")
+
implementation project(':ignite-core')
- implementation project(':ignite-raft-api')
- implementation project(':ignite-configuration-api')
implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
- implementation libs.fastutil.core
-
- testImplementation libs.mockito.core
- testImplementation libs.mockito.junit
}
-description = 'ignite-replicator'
+description = 'ignite-placement-driver-api'
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
index 34b50e0c45..ef009b0001 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
-dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
- implementation project(':ignite-core')
- implementation project(':ignite-raft-api')
- implementation project(':ignite-configuration-api')
- implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
- implementation libs.fastutil.core
+/**
+ * Lease granted message.
+ */
+@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE)
+public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage {
+ @Marshallable
+ HybridTimestamp leaseStartTime();
- testImplementation libs.mockito.core
- testImplementation libs.mockito.junit
-}
+ @Marshallable
+ HybridTimestamp leaseExpirationTime();
-description = 'ignite-replicator'
+ boolean force();
+}
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
index 34b50e0c45..c6addb77ad 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
@@ -15,22 +15,16 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
+import org.apache.ignite.network.annotations.Transferable;
-dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
- implementation project(':ignite-core')
- implementation project(':ignite-raft-api')
- implementation project(':ignite-configuration-api')
- implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
- implementation libs.fastutil.core
+/**
+ * Response for lease granted message.
+ */
+@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE_RESPONSE)
+public interface LeaseGrantedMessageResponse extends PlacementDriverReplicaMessage {
+ boolean accepted();
- testImplementation libs.mockito.core
- testImplementation libs.mockito.junit
+ String redirectProposal();
}
-
-description = 'ignite-replicator'
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
similarity index 53%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
index 34b50e0c45..b73cdfeea1 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
+import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_NAME;
+import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_TYPE;
-dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
- implementation project(':ignite-core')
- implementation project(':ignite-raft-api')
- implementation project(':ignite-configuration-api')
- implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
- implementation libs.fastutil.core
+import org.apache.ignite.network.annotations.MessageGroup;
- testImplementation libs.mockito.core
- testImplementation libs.mockito.junit
-}
+/**
+ * Message group for placement driver messages.
+ */
+@MessageGroup(groupType = GROUP_TYPE, groupName = GROUP_NAME)
+public interface PlacementDriverMessageGroup {
+ /** Placement driver message group type. */
+ short GROUP_TYPE = 11;
+
+ String GROUP_NAME = "PlacementDriverMessages";
-description = 'ignite-replicator'
+ short LEASE_GRANTED_MESSAGE = 0;
+
+ short LEASE_GRANTED_MESSAGE_RESPONSE = 1;
+}
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
index 34b50e0c45..9f26f228a8 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
-dependencies {
- annotationProcessor project(":ignite-network-annotation-processor")
- implementation project(':ignite-core')
- implementation project(':ignite-raft-api')
- implementation project(':ignite-configuration-api')
- implementation project(':ignite-network-api')
- implementation libs.jetbrains.annotations
- implementation libs.fastutil.core
-
- testImplementation libs.mockito.core
- testImplementation libs.mockito.junit
+/**
+ * Placement driver replica messages.
+ */
+public interface PlacementDriverReplicaMessage extends NetworkMessage {
+ /**
+ * Gets a replication group id.
+ *
+ * @return Replication group id.
+ */
+ @Marshallable
+ ReplicationGroupId groupId();
}
-
-description = 'ignite-replicator'
diff --git a/modules/placement-driver/build.gradle b/modules/placement-driver/build.gradle
index c2bb5c894b..02bf76e2ef 100644
--- a/modules/placement-driver/build.gradle
+++ b/modules/placement-driver/build.gradle
@@ -34,6 +34,7 @@ dependencies {
implementation project(':ignite-affinity')
implementation project(':ignite-vault')
implementation project(':ignite-rocksdb-common:')
+ implementation project(':ignite-replicator')
implementation libs.jetbrains.annotations
@@ -48,6 +49,7 @@ dependencies {
integrationTestImplementation project(':ignite-schema')
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-affinity')
+ integrationTestImplementation project(':ignite-replicator')
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-network')))
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 8cc78bb9e5..ba5c983058 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -19,22 +19,69 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.placementdriver.ActiveActorTest.TestReplicationGroup.GROUP_ID;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.file.Path;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.LeaderChangeNotification;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -44,9 +91,21 @@ import org.mockito.quality.Strictness;
/**
* Placement driver active actor test.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
-public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
+public class ActiveActorTest extends IgniteAbstractTest {
+ /** RAFT message factory. */
+ private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+
+ /** Base node port. */
+ private static final int PORT_BASE = 1234;
+
+ @InjectConfiguration
+ protected RaftConfiguration raftConfiguration;
+
+ /** RPC executor. */
+ protected ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
+
private Map<String, PlacementDriverManager> placementDriverManagers = new HashMap<>();
@Mock
@@ -57,36 +116,54 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
@AfterEach
@Override
- protected void afterTest() throws Exception {
+ public void tearDown(TestInfo testInfo) throws Exception {
List<AutoCloseable> closeables = placementDriverManagers.values().stream().map(p -> (AutoCloseable) p::stop).collect(toList());
closeAll(closeables);
placementDriverManagers.clear();
- super.afterTest();
+ super.tearDown(testInfo);
}
- /** {@inheritDoc} */
- @Override
+ /**
+ * The method is called after every node of the cluster starts.
+ *
+ * @param nodeName Node name.
+ * @param clusterService Cluster service.
+ * @param dataPath Data path for raft node.
+ * @param placementDriverNodesNames Names of all nodes in raft group.
+ * @param eventsClientListener Raft events listener for client.
+ */
protected void afterNodeStart(
String nodeName,
ClusterService clusterService,
+ Path dataPath,
Set<String> placementDriverNodesNames,
RaftGroupEventsClientListener eventsClientListener
) {
+ var raftManager = new Loza(clusterService, raftConfiguration, dataPath, new HybridClockImpl());
+
+ LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);
+
+ var raftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ logicalTopologyService,
+ Loza.FACTORY,
+ eventsClientListener
+ );
+
PlacementDriverManager placementDriverManager = new PlacementDriverManager(
msm,
new VaultManager(new InMemoryVaultService()),
- TestReplicationGroup.GROUP_ID,
+ GROUP_ID,
clusterService,
- raftConfiguration,
() -> completedFuture(placementDriverNodesNames),
- new LogicalTopologyServiceTestImpl(clusterService),
- executor,
+ logicalTopologyService,
+ raftManager,
+ raftGroupServiceFactory,
tblsCfg,
- new HybridClockImpl(),
- eventsClientListener
+ new HybridClockImpl()
);
placementDriverManager.start();
@@ -95,27 +172,14 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
}
/**
- * The method is called after every node of the cluster starts.
+ * The method is called after every node of the cluster stops.
*
* @param nodeName Node name.
*/
- @Override
- protected void afterNodeStop(String nodeName) {
+ private void afterNodeStop(String nodeName) {
placementDriverManagers.remove(nodeName);
}
- /** {@inheritDoc} */
- @Override
- protected boolean afterInitCheckCondition(String leaderName) {
- return checkSingleActiveActor(leaderName);
- }
-
- /** {@inheritDoc} */
- @Override
- protected boolean afterLeaderChangeCheckCondition(String leaderName) {
- return checkSingleActiveActor(leaderName);
- }
-
private boolean checkSingleActiveActor(String leaderName) {
for (Map.Entry<String, PlacementDriverManager> e : placementDriverManagers.entrySet()) {
if (e.getValue().isActiveActor() != e.getKey().equals(leaderName)) {
@@ -125,4 +189,476 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
return true;
}
+
+ @Test
+ public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception {
+ var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+ var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+ int nodes = 2;
+
+ TopologyAwareRaftGroupService raftClient = startCluster(
+ testInfo,
+ clusterServices,
+ raftServers,
+ addr -> true,
+ nodes,
+ PORT_BASE + 1
+ );
+
+ CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>();
+
+ raftClient.subscribeLeader((node, term) -> leaderFut.complete(node));
+
+ ClusterNode leader = leaderFut.get(10, TimeUnit.SECONDS);
+
+ assertNotNull(leader);
+
+ afterInitCheckConditionWithWait(leader.name());
+
+ stopCluster(clusterServices, raftServers, List.of(raftClient), nodes);
+ }
+
+ @Test
+ public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws Exception {
+ var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+ var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+ int nodes = 3;
+ Predicate<NetworkAddress> isServerAddress = addr -> true;
+
+ TopologyAwareRaftGroupService raftClient = startCluster(
+ testInfo,
+ clusterServices,
+ raftServers,
+ isServerAddress,
+ nodes,
+ PORT_BASE
+ );
+
+ raftClient.refreshLeader().get();
+
+ var clientClusterService = clusterService(testInfo, PORT_BASE + nodes + 1, new StaticNodeFinder(getNetworkAddresses(nodes)));
+ clientClusterService.start();
+
+ TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient(
+ clientClusterService,
+ clusterServices,
+ isServerAddress,
+ nodes,
+ null,
+ false
+ );
+
+ AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+ AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
+ AtomicInteger callsCount = new AtomicInteger();
+
+ raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+ for (int i = 0; i < 2; i++) {
+ raftClientNoInitialNotify.unsubscribeLeader();
+
+ raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+ callsCount.incrementAndGet();
+ leaderRefNoInitialNotify.set(node);
+ });
+ }
+
+ assertTrue(callsCount.get() <= 1);
+
+ assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
+
+ ClusterNode leader = leaderRef.get();
+
+ assertNotNull(leader);
+
+ log.info("Leader: " + leader);
+
+ afterInitCheckConditionWithWait(leader.name());
+
+ var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port()));
+ raftServiceToStop.stopRaftNodes(GROUP_ID);
+ raftServiceToStop.stop();
+
+ afterNodeStop(leader.name());
+
+ clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stop();
+
+ assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000));
+ assertTrue(waitForCondition(() -> !leader.equals(leaderRefNoInitialNotify.get()), 1000));
+
+ log.info("New Leader: " + leaderRef.get());
+
+ afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
+ raftClientNoInitialNotify.refreshLeader().get();
+
+ assertEquals(raftClientNoInitialNotify.leader().consistentId(), leaderRef.get().name());
+
+ stopCluster(clusterServices, raftServers, List.of(raftClient, raftClientNoInitialNotify), nodes);
+
+ clientClusterService.stop();
+ }
+
+ @Test
+ public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
+ var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+ var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+ int nodes = 3;
+ Predicate<NetworkAddress> isServerAddress = addr -> true;
+
+ TopologyAwareRaftGroupService raftClient = startCluster(
+ testInfo,
+ clusterServices,
+ raftServers,
+ isServerAddress,
+ nodes,
+ PORT_BASE
+ );
+
+ raftClient.refreshLeader().get();
+
+ var clientClusterService = clusterService(testInfo, PORT_BASE + nodes + 1, new StaticNodeFinder(getNetworkAddresses(nodes)));
+ clientClusterService.start();
+
+ TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient(
+ clientClusterService,
+ clusterServices,
+ isServerAddress,
+ nodes,
+ null,
+ false
+ );
+
+ AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+ AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
+ AtomicInteger callsCount = new AtomicInteger();
+
+ raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+ for (int i = 0; i < 2; i++) {
+ raftClientNoInitialNotify.unsubscribeLeader();
+
+ raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+ callsCount.incrementAndGet();
+ leaderRefNoInitialNotify.set(node);
+ });
+ }
+
+ assertTrue(callsCount.get() <= 1);
+
+ assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
+
+ ClusterNode leader = leaderRef.get();
+
+ assertNotNull(leader);
+
+ log.info("Leader: " + leader);
+
+ afterInitCheckConditionWithWait(leader.name());
+
+ Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get();
+
+ log.info("Peer to transfer leader: " + newLeaderPeer);
+
+ raftClient.transferLeadership(newLeaderPeer).get();
+
+ String leaderId = newLeaderPeer.consistentId();
+
+ assertTrue(waitForCondition(() -> leaderId.equals(leaderRef.get().name()), 10_000));
+ assertTrue(waitForCondition(
+ () -> leaderRefNoInitialNotify.get() != null && leaderId.equals(leaderRefNoInitialNotify.get().name()), 1000)
+ );
+
+ log.info("New Leader: " + leaderRef.get());
+
+ afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
+ raftClient.refreshLeader().get();
+
+ assertEquals(raftClient.leader().consistentId(), leaderRef.get().name());
+
+ stopCluster(clusterServices, raftServers, List.of(raftClient, raftClientNoInitialNotify), nodes);
+ clientClusterService.stop();
+ }
+
+ /**
+ * Stops cluster.
+ *
+ * @param clusterServices Cluster services.
+ * @param raftServers RAFT services.
+ * @param raftClients RAFT clients.
+ * @param nodes Node count.
+ * @throws Exception If failed.
+ */
+ private void stopCluster(
+ HashMap<NetworkAddress, ClusterService> clusterServices,
+ HashMap<NetworkAddress, JraftServerImpl> raftServers,
+ List<TopologyAwareRaftGroupService> raftClients,
+ int nodes
+ ) throws Exception {
+ if (raftClients != null) {
+ raftClients.forEach(client -> client.shutdown());
+ }
+
+ for (NetworkAddress addr : getNetworkAddresses(nodes)) {
+ if (raftServers.containsKey(addr)) {
+ raftServers.get(addr).stopRaftNodes(GROUP_ID);
+
+ raftServers.get(addr).stop();
+ }
+
+ if (clusterServices.containsKey(addr)) {
+ clusterServices.get(addr).stop();
+ }
+ }
+ }
+
+ /**
+ * Starts cluster.
+ *
+ * @param testInfo Test info.
+ * @param clusterServices Cluster services.
+ * @param raftServers RAFT services.
+ * @param isServerAddress Closure to determine a server node.
+ * @param nodes Node count.
+ * @param clientPort Port of node where a client will start.
+ * @return Topology aware client.
+ */
+ private TopologyAwareRaftGroupService startCluster(
+ TestInfo testInfo,
+ HashMap<NetworkAddress, ClusterService> clusterServices,
+ HashMap<NetworkAddress, JraftServerImpl> raftServers,
+ Predicate<NetworkAddress> isServerAddress,
+ int nodes,
+ int clientPort
+ ) {
+ List<NetworkAddress> addresses = getNetworkAddresses(nodes);
+
+ var nodeFinder = new StaticNodeFinder(addresses);
+
+ TopologyAwareRaftGroupService raftClient = null;
+
+ for (NetworkAddress addr : addresses) {
+ var cluster = clusterService(testInfo, addr.port(), nodeFinder);
+
+ cluster.start();
+
+ clusterServices.put(addr, cluster);
+ }
+
+ PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, isServerAddress, nodes);
+
+ Set<String> placementDriverNodesNames = peersAndLearners.peers().stream().map(Peer::consistentId).collect(toSet());
+
+ for (NetworkAddress addr : addresses) {
+ var cluster = clusterServices.get(addr);
+
+ RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
+
+ if (isServerAddress.test(addr)) { //RAFT server node
+ var localPeer = peersAndLearners.peers().stream()
+ .filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
+
+ var dataPath = workDir.resolve("raft_" + localPeer.consistentId());
+
+ var raftServer = new JraftServerImpl(
+ cluster,
+ dataPath,
+ new NodeOptions(),
+ eventsClientListener
+ );
+ raftServer.start();
+
+ raftServer.startRaftNode(
+ new RaftNodeId(GROUP_ID, localPeer),
+ peersAndLearners,
+ new TestRaftGroupListener(),
+ RaftGroupOptions.defaults()
+ );
+
+ raftServers.put(addr, raftServer);
+
+ afterNodeStart(localPeer.consistentId(), cluster, dataPath, placementDriverNodesNames, eventsClientListener);
+ }
+
+ if (addr.port() == clientPort) {
+ assertTrue(isServerAddress.test(addr));
+
+ raftClient = startTopologyAwareClient(cluster, clusterServices, isServerAddress, nodes, eventsClientListener, true);
+ }
+ }
+
+ return raftClient;
+ }
+
+ private TopologyAwareRaftGroupService startTopologyAwareClient(
+ ClusterService localClusterService,
+ Map<NetworkAddress, ClusterService> clusterServices,
+ Predicate<NetworkAddress> isServerAddress,
+ int nodes,
+ RaftGroupEventsClientListener eventsClientListener,
+ boolean notifyOnSubscription
+ ) {
+ if (eventsClientListener == null) {
+ eventsClientListener = new RaftGroupEventsClientListener();
+
+ var finalEventsClientListener = eventsClientListener;
+ localClusterService.messagingService().addMessageHandler(RaftMessageGroup.class, (msg, sender, correlationId) -> {
+ if (msg instanceof LeaderChangeNotification) {
+ LeaderChangeNotification msg0 = (LeaderChangeNotification) msg;
+
+ ClusterNode node = localClusterService.topologyService().getByConsistentId(sender);
+ finalEventsClientListener.onLeaderElected(msg0.groupId(), node, msg0.term());
+ }
+ });
+ }
+
+ return (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start(
+ GROUP_ID,
+ localClusterService,
+ FACTORY,
+ raftConfiguration,
+ peersAndLearners(clusterServices, isServerAddress, nodes),
+ true,
+ executor,
+ new LogicalTopologyServiceTestImpl(localClusterService),
+ eventsClientListener,
+ notifyOnSubscription
+ ).join();
+ }
+
+ private static PeersAndLearners peersAndLearners(
+ Map<NetworkAddress, ClusterService> clusterServices,
+ Predicate<NetworkAddress> isServerAddress,
+ int nodes
+ ) {
+ return PeersAndLearners.fromConsistentIds(
+ getNetworkAddresses(nodes).stream().filter(isServerAddress)
+ .map(netAddr -> clusterServices.get(netAddr).topologyService().localMember().name()).collect(
+ toSet()));
+ }
+
+ /**
+ * Generates a node address for each node.
+ *
+ * @param nodes Node count.
+ * @return List on network addresses.
+ */
+ private static List<NetworkAddress> getNetworkAddresses(int nodes) {
+ List<NetworkAddress> addresses = IntStream.range(PORT_BASE, PORT_BASE + nodes)
+ .mapToObj(port -> new NetworkAddress("localhost", port))
+ .collect(Collectors.toList());
+ return addresses;
+ }
+
+ /**
+ * Checks the condition after cluster and raft clients initialization, waiting for this condition.
+ *
+ * @param leaderName Current leader name.
+ * @throws InterruptedException If failed.
+ */
+ private void afterInitCheckConditionWithWait(String leaderName) throws InterruptedException {
+ assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 10_000));
+ }
+
+ /**
+ * Checks the condition after cluster and raft clients initialization.
+ *
+ * @param leaderName Current leader name.
+ * @return Condition result.
+ */
+ private boolean afterInitCheckCondition(String leaderName) {
+ return checkSingleActiveActor(leaderName);
+ }
+
+ /**
+ * Checks the condition after leader change, waiting for this condition.
+ *
+ * @param leaderName Current leader name.
+ * @throws InterruptedException If failed.
+ */
+ private void afterLeaderChangeCheckConditionWithWait(String leaderName) throws InterruptedException {
+ assertTrue(waitForCondition(() -> afterLeaderChangeCheckCondition(leaderName), 10_000));
+ }
+
+ /**
+ * Checks the condition after leader change.
+ *
+ * @param leaderName Current leader name.
+ * @return Condition result.
+ */
+ private boolean afterLeaderChangeCheckCondition(String leaderName) {
+ return checkSingleActiveActor(leaderName);
+ }
+
+ /**
+ * Replication test group class.
+ */
+ public enum TestReplicationGroup implements ReplicationGroupId {
+ /** Replication group id. */
+ GROUP_ID;
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return "TestReplicationGroup";
+ }
+ }
+
+ private static class TestRaftGroupListener implements RaftGroupListener {
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ iterator.forEachRemaining(closure -> {
+ closure.result(null);
+ });
+ }
+
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ }
+
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+ }
+
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ return true;
+ }
+
+ @Override
+ public void onShutdown() {
+ }
+ }
+
+ /**
+ * Test implementation of {@link LogicalTopologyService}.
+ */
+ protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
+ private final ClusterService clusterService;
+
+ public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
+ this.clusterService = clusterService;
+ }
+
+ @Override
+ public void addEventListener(LogicalTopologyEventListener listener) {
+
+ }
+
+ @Override
+ public void removeEventListener(LogicalTopologyEventListener listener) {
+
+ }
+
+ @Override
+ public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+ return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+ }
+
+ @Override
+ public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
+ return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+ }
+ }
}
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index b929784784..eb8c04df9a 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -21,8 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
@@ -39,8 +37,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -61,12 +57,12 @@ import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -108,8 +104,6 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
private PlacementDriverManager placementDriverManager;
- private ScheduledExecutorService raftExecutorService;
-
private TestInfo testInfo;
@BeforeEach
@@ -132,19 +126,22 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
when(cmgManager.metaStorageNodes())
.thenReturn(completedFuture(Set.of(clusterService.localConfiguration().getName())));
- raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
- new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterService.localConfiguration().getName(),
- CLIENT_POOL_NAME), log
- ));
-
RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
+ LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);
+
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ logicalTopologyService,
+ Loza.FACTORY,
+ eventsClientListener
+ );
+
raftManager = new Loza(
clusterService,
raftConfiguration,
workDir.resolve("loza"),
new HybridClockImpl(),
- raftExecutorService,
eventsClientListener
);
@@ -154,7 +151,7 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
vaultManager,
clusterService,
cmgManager,
- mock(LogicalTopologyService.class),
+ logicalTopologyService,
raftManager,
storage
);
@@ -164,17 +161,16 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
vaultManager,
MetastorageGroupId.INSTANCE,
clusterService,
- raftConfiguration,
() -> completedFuture(peersAndLearners(
new HashMap<>(Map.of(new NetworkAddress("localhost", PORT), clusterService)),
addr -> true,
1)
.peers().stream().map(Peer::consistentId).collect(toSet())),
- new LogicalTopologyServiceTestImpl(clusterService),
- raftExecutorService,
+ logicalTopologyService,
+ raftManager,
+ topologyAwareRaftGroupServiceFactory,
tblsCfg,
- clock,
- eventsClientListener
+ clock
);
vaultManager.start();
@@ -203,7 +199,6 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
raftManager.stop();
clusterService.stop();
vaultManager.stop();
- raftExecutorService.shutdown();
}
private static PeersAndLearners peersAndLearners(
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 051340efd9..ea1f96402d 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -49,12 +49,6 @@ public class LeaseUpdater {
/** Ignite logger. */
private static final IgniteLogger LOG = Loggers.forClass(LeaseUpdater.class);
- /**
- * Cluster cLock skew. The constant determines the undefined inclusive interval to compares timestamp from various nodes.
- * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
- */
- private static final long CLOCK_SKEW = 7L;
-
/** Update attempts interval in milliseconds. */
private static final long UPDATE_LEASE_MS = 200L;
@@ -152,23 +146,6 @@ public class LeaseUpdater {
}
}
- /**
- * Compares two timestamps with the clock skew.
- * t1, t2 comparable if t1 is not contained on [t2 - CLOCK_SKEW; t2 + CLOCK_SKEW].
- * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
- *
- * @param ts1 First timestamp.
- * @param ts2 Second timestamp.
- * @return Result of comparison can be positive or negative, or {@code 0} if timestamps are not comparable.
- */
- private static int compareWithClockSkew(HybridTimestamp ts1, HybridTimestamp ts2) {
- if (ts1.getPhysical() - CLOCK_SKEW <= ts2.getPhysical() && ts1.getPhysical() + CLOCK_SKEW >= ts2.getPhysical()) {
- return 0;
- }
-
- return ts1.compareTo(ts2);
- }
-
/**
* Finds a node that can be the leaseholder.
*
@@ -263,7 +240,7 @@ public class LeaseUpdater {
HybridTimestamp now = clock.now();
return lease == EMPTY_LEASE
- || (!candidate.equals(lease.getLeaseholder()) && compareWithClockSkew(now, lease.getLeaseExpirationTime()) > 0);
+ || (!candidate.equals(lease.getLeaseholder()) && now.after(lease.getLeaseExpirationTime()));
}
}
}
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 545a59fdb4..ea29a558d5 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -30,16 +30,16 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.jetbrains.annotations.TestOnly;
/**
@@ -52,8 +52,6 @@ public class PlacementDriverManager implements IgniteComponent {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- private final RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
-
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
@@ -66,17 +64,14 @@ public class PlacementDriverManager implements IgniteComponent {
/** The closure determines nodes where are participants of placement driver. */
private final Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider;
- /** Raft client future. Can contain null, if this node is not in placement driver group. */
- private final CompletableFuture<TopologyAwareRaftGroupService> raftClientFuture;
-
- /** Executor sends a raft requests and receives response. */
- private final ScheduledExecutorService raftClientExecutor;
+ private final RaftManager raftManager;
- /** Logical topology service. */
- private final LogicalTopologyService logicalTopologyService;
+ private final TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory;
- /** Raft configuration. */
- private final RaftConfiguration raftConfiguration;
+ /**
+ * Raft client future. Can contain null, if this node is not in placement driver group.
+ */
+ private final CompletableFuture<TopologyAwareRaftGroupService> raftClientFuture;
/** Lease tracker. */
private final LeaseTracker leaseTracker;
@@ -84,9 +79,6 @@ public class PlacementDriverManager implements IgniteComponent {
/** Lease updater. */
private final LeaseUpdater leaseUpdater;
- private final RaftGroupEventsClientListener raftGroupEventsClientListener;
-
- /** The flag is true when the instance of placement driver renews leases, false when the instance only tracks leases. */
private volatile boolean isActiveActor;
/**
@@ -96,10 +88,10 @@ public class PlacementDriverManager implements IgniteComponent {
* @param vaultManager Vault manager.
* @param replicationGroupId Id of placement driver group.
* @param clusterService Cluster service.
- * @param raftConfiguration Raft configuration.
* @param placementDriverNodesNamesProvider Provider of the set of placement driver nodes' names.
* @param logicalTopologyService Logical topology service.
- * @param raftClientExecutor Raft client executor.
+ * @param raftManager Raft manager.
+ * @param topologyAwareRaftGroupServiceFactory Raft client factory.
* @param tablesCfg Table configuration.
* @param clock Hybrid clock.
*/
@@ -108,21 +100,18 @@ public class PlacementDriverManager implements IgniteComponent {
VaultManager vaultManager,
ReplicationGroupId replicationGroupId,
ClusterService clusterService,
- RaftConfiguration raftConfiguration,
Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider,
LogicalTopologyService logicalTopologyService,
- ScheduledExecutorService raftClientExecutor,
+ RaftManager raftManager,
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
TablesConfiguration tablesCfg,
- HybridClock clock,
- RaftGroupEventsClientListener raftGroupEventsClientListener
+ HybridClock clock
) {
this.replicationGroupId = replicationGroupId;
this.clusterService = clusterService;
- this.raftConfiguration = raftConfiguration;
this.placementDriverNodesNamesProvider = placementDriverNodesNamesProvider;
- this.logicalTopologyService = logicalTopologyService;
- this.raftClientExecutor = raftClientExecutor;
- this.raftGroupEventsClientListener = raftGroupEventsClientListener;
+ this.raftManager = raftManager;
+ this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
this.raftClientFuture = new CompletableFuture<>();
this.leaseTracker = new LeaseTracker(vaultManager, metaStorageMgr);
@@ -144,24 +133,17 @@ public class PlacementDriverManager implements IgniteComponent {
String thisNodeName = clusterService.topologyService().localMember().name();
if (placementDriverNodes.contains(thisNodeName)) {
- leaseUpdater.init(thisNodeName);
-
- return TopologyAwareRaftGroupService.start(
- replicationGroupId,
- clusterService,
- raftMessagesFactory,
- raftConfiguration,
- PeersAndLearners.fromConsistentIds(placementDriverNodes),
- true,
- raftClientExecutor,
- logicalTopologyService,
- raftGroupEventsClientListener,
- true
- ).thenCompose(client -> {
- TopologyAwareRaftGroupService topologyAwareClient = (TopologyAwareRaftGroupService) client;
-
- return topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v -> topologyAwareClient);
- });
+ try {
+ leaseUpdater.init(thisNodeName);
+
+ return raftManager.startRaftGroupService(
+ replicationGroupId,
+ PeersAndLearners.fromConsistentIds(placementDriverNodes),
+ topologyAwareRaftGroupServiceFactory
+ ).thenCompose(client -> client.subscribeLeader(this::onLeaderChange).thenApply(v -> client));
+ } catch (NodeStoppingException e) {
+ return failedFuture(e);
+ }
} else {
return completedFuture(null);
}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index 09446af8a5..789a327f31 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -44,6 +44,24 @@ public interface RaftManager extends IgniteComponent {
RaftGroupEventsListener eventsLsnr
) throws NodeStoppingException;
+ /**
+ * Starts a Raft group and a Raft service on the current node, using the given raft group service.
+ *
+ * @param nodeId Raft node ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param lsnr Raft group listener.
+ * @param eventsLsnr Raft group events listener.
+ * @param factory Service factory.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ RaftServiceFactory<T> factory
+ ) throws NodeStoppingException;
+
/**
* Stops a given local Raft node.
*
@@ -77,4 +95,19 @@ public interface RaftManager extends IgniteComponent {
ReplicationGroupId groupId,
PeersAndLearners configuration
) throws NodeStoppingException;
+
+ /**
+ * Creates a Raft group service providing operations on a Raft group, using the given factory.
+ *
+ * @param groupId Raft group ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param factory Factory that should be used to create raft service.
+ * @return Future that will be completed with an instance of a Raft group service.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners configuration,
+ RaftServiceFactory<T> factory
+ ) throws NodeStoppingException;
}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
new file mode 100644
index 0000000000..ecea7e524d
--- /dev/null
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Factory that can be used to create customized Raft service.
+ */
+public interface RaftServiceFactory<T extends RaftGroupService> {
+ /**
+ * Creates Raft group service.
+ *
+ * @param groupId Group id.
+ * @param peersAndLearners Peers configuration.
+ * @param raftConfiguration Raft configuration.
+ * @param raftClientExecutor Client executor.
+ * @return Future that contains client when completes.
+ */
+ CompletableFuture<T> startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners peersAndLearners,
+ RaftConfiguration raftConfiguration,
+ ScheduledExecutorService raftClientExecutor
+ );
+}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index e86f83e994..d731fc75e5 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -233,6 +233,13 @@ public interface RaftGroupService {
*/
void shutdown();
+ /**
+ * Reads index from the group leader.
+ *
+ * @return Future containing the index.
+ */
+ CompletableFuture<Long> readIndex();
+
/**
* Returns a cluster service.
*
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 455ae5d49b..b452883af3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
@@ -46,6 +48,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -55,13 +58,13 @@ import org.jetbrains.annotations.TestOnly;
// see https://issues.apache.org/jira/browse/IGNITE-18273
public class Loza implements RaftManager {
/** Factory. */
- private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+ public static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
/** Raft client pool name. */
public static final String CLIENT_POOL_NAME = "Raft-Group-Client";
/** Raft client pool size. Size was taken from jraft's TimeManager. */
- public static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
+ private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(Loza.class);
@@ -93,14 +96,12 @@ public class Loza implements RaftManager {
* @param raftConfiguration Raft configuration.
* @param dataPath Data path.
* @param clock A hybrid logical clock.
- * @param executor Executor for raft group services.
*/
public Loza(
ClusterService clusterNetSvc,
RaftConfiguration raftConfiguration,
Path dataPath,
HybridClock clock,
- ScheduledExecutorService executor,
RaftGroupEventsClientListener raftGroupEventsClientListener
) {
this.clusterNetSvc = clusterNetSvc;
@@ -114,7 +115,11 @@ public class Loza implements RaftManager {
this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options, raftGroupEventsClientListener);
- this.executor = executor;
+ this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+ new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
+ CLIENT_POOL_NAME), LOG
+ )
+ );
}
/**
@@ -136,11 +141,6 @@ public class Loza implements RaftManager {
raftConfiguration,
dataPath,
clock,
- new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
- new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
- CLIENT_POOL_NAME), LOG
- )
- ),
new RaftGroupEventsClientListener()
);
}
@@ -162,6 +162,8 @@ public class Loza implements RaftManager {
busyLock.block();
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+
raftServer.stop();
}
@@ -175,6 +177,17 @@ public class Loza implements RaftManager {
return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
}
+ @Override
+ public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ RaftServiceFactory<T> factory
+ ) throws NodeStoppingException {
+ return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults(), factory);
+ }
+
/**
* Starts a Raft group on the current node.
*
@@ -191,13 +204,35 @@ public class Loza implements RaftManager {
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr,
RaftGroupOptions groupOptions
+ ) throws NodeStoppingException {
+ return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, groupOptions, null);
+ }
+
+ /**
+ * Starts a Raft group on the current node.
+ *
+ * @param nodeId Raft node ID.
+ * @param configuration Peers and Learners of the Raft group.
+ * @param lsnr Raft group listener.
+ * @param eventsLsnr Raft group events listener.
+ * @param groupOptions Options to apply to the group.
+ * @param raftServiceFactory If not null, used for creation of raft group service.
+ * @throws NodeStoppingException If node stopping intention was detected.
+ */
+ public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+ RaftNodeId nodeId,
+ PeersAndLearners configuration,
+ RaftGroupListener lsnr,
+ RaftGroupEventsListener eventsLsnr,
+ RaftGroupOptions groupOptions,
+ @Nullable RaftServiceFactory<T> raftServiceFactory
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions);
+ return startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions, raftServiceFactory);
} finally {
busyLock.leaveBusy();
}
@@ -219,12 +254,30 @@ public class Loza implements RaftManager {
}
}
- private CompletableFuture<RaftGroupService> startRaftGroupNodeInternal(
+ @Override
+ public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners configuration,
+ RaftServiceFactory<T> factory
+ ) throws NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ return factory.startRaftGroupService(groupId, configuration, raftConfiguration, executor);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNodeInternal(
RaftNodeId nodeId,
PeersAndLearners configuration,
RaftGroupListener lsnr,
RaftGroupEventsListener raftGrpEvtsLsnr,
- RaftGroupOptions groupOptions
+ RaftGroupOptions groupOptions,
+ @Nullable RaftServiceFactory<T> raftServiceFactory
) {
if (LOG.isInfoEnabled()) {
LOG.info("Start new raft node={} with initial configuration={}", nodeId, configuration);
@@ -239,7 +292,9 @@ public class Loza implements RaftManager {
));
}
- return startRaftGroupServiceInternal(nodeId.groupId(), configuration);
+ return raftServiceFactory == null
+ ? (CompletableFuture<T>) startRaftGroupServiceInternal(nodeId.groupId(), configuration)
+ : raftServiceFactory.startRaftGroupService(nodeId.groupId(), configuration, raftConfiguration, executor);
}
private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index b5156316fd..d89fe751bd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
@@ -460,6 +461,18 @@ public class RaftGroupServiceImpl implements RaftGroupService {
busyLock.block();
}
+ @Override
+ public CompletableFuture<Long> readIndex() {
+ Function<Peer, ? extends NetworkMessage> requestFactory = p -> factory.readIndexRequest()
+ .groupId(groupId)
+ .build();
+
+ Peer leader = leader();
+ Peer node = leader == null ? randomNode() : leader;
+ return this.<ReadIndexResponse>sendWithRetry(node, requestFactory)
+ .thenApply(ReadIndexResponse::index);
+ }
+
@Override
public ClusterService clusterService() {
return cluster;
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index b2893a9455..c92067d7f0 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -21,8 +21,10 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -82,6 +84,7 @@ import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -559,6 +562,28 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest {
assertThat(fut.thenApply(GetLeaderResponse::currentTerm), willBe(equalTo(CURRENT_TERM)));
}
+ @Test
+ public void testReadIndex() {
+ RaftGroupService service = startRaftGroupService(NODES, false);
+ mockReadIndex(false);
+
+ CompletableFuture<Long> fut = service.readIndex();
+
+ assertThat(fut, willSucceedFast());
+
+ assertEquals(1L, fut.join());
+ }
+
+ @Test
+ public void testReadIndexWithMessageSendTimeout() {
+ RaftGroupService service = startRaftGroupService(NODES, false);
+ mockReadIndex(true);
+
+ CompletableFuture<Long> fut = service.readIndex();
+
+ assertThat(fut, willThrowFast(TimeoutException.class));
+ }
+
private RaftGroupService startRaftGroupService(List<Peer> peers, boolean getLeader) {
PeersAndLearners memberConfiguration = PeersAndLearners.fromPeers(peers, Set.of());
@@ -570,6 +595,17 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest {
return service.join();
}
+ /**
+ * Mock read index request.
+ */
+ private void mockReadIndex(boolean timeout) {
+ when(messagingService.invoke(any(ClusterNode.class), any(ReadIndexRequest.class), anyLong()))
+ .then(invocation -> timeout
+ ? failedFuture(new TimeoutException())
+ : completedFuture(FACTORY.readIndexResponse().index(1L).build())
+ );
+ }
+
/**
* Mocks sending {@link ActionRequest}s.
*
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index 34b50e0c45..e21bb7d4b1 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -18,17 +18,29 @@
apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
-
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
annotationProcessor project(":ignite-network-annotation-processor")
implementation project(':ignite-core')
+ implementation project(':ignite-raft')
implementation project(':ignite-raft-api')
+ implementation project(':ignite-configuration')
implementation project(':ignite-configuration-api')
+ implementation project(':ignite-cluster-management')
implementation project(':ignite-network-api')
+ implementation project(':ignite-placement-driver-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
+ integrationTestImplementation project(':ignite-raft')
+ integrationTestImplementation project(':ignite-raft-api')
+
+ testImplementation(testFixtures(project(':ignite-core')))
+ testImplementation(testFixtures(project(':ignite-network')))
+ testImplementation(testFixtures(project(':ignite-configuration:')))
+ testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
}
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
similarity index 97%
rename from modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
rename to modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index b58507607a..b4f90d47aa 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -310,6 +310,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
*/
public CompletableFuture<Void> unsubscribeLeader() {
serverEventHandler.setOnLeaderElectedCallback(null);
+ serverEventHandler.resetLeader();
var peers = peers();
var futs = new CompletableFuture[peers.size()];
@@ -337,7 +338,9 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
@Override
public @Nullable Peer leader() {
- return raftClient.leader();
+ Peer leader = serverEventHandler.leader();
+
+ return leader == null ? raftClient.leader() : leader;
}
@Override
@@ -422,6 +425,11 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
raftClient.shutdown();
}
+ @Override
+ public CompletableFuture<Long> readIndex() {
+ return raftClient.readIndex();
+ }
+
@Override
public ClusterService clusterService() {
return raftClient.clusterService();
@@ -434,6 +442,9 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
/** A term of last elected leader. */
private long term = 0;
+ /** Last elected leader. */
+ private Peer leaderPeer;
+
/** A leader elected callback. */
private BiConsumer<ClusterNode, Long> onLeaderElectedCallback;
@@ -446,6 +457,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
private synchronized void onLeaderElected(ClusterNode node, long term) {
if (onLeaderElectedCallback != null && term > this.term) {
this.term = term;
+ this.leaderPeer = new Peer(node.name());
onLeaderElectedCallback.accept(node, term);
}
@@ -473,5 +485,13 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
public void accept(ClusterNode clusterNode, Long term) {
onLeaderElected(clusterNode, term);
}
+
+ Peer leader() {
+ return leaderPeer;
+ }
+
+ void resetLeader() {
+ leaderPeer = null;
+ }
}
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
new file mode 100644
index 0000000000..8bb79db8a9
--- /dev/null
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+
+/**
+ * Factory for creation {@link TopologyAwareRaftGroupService}.
+ */
+public class TopologyAwareRaftGroupServiceFactory implements RaftServiceFactory<TopologyAwareRaftGroupService> {
+ private final ClusterService clusterService;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final RaftMessagesFactory raftMessagesFactory;
+
+ private final RaftGroupEventsClientListener eventsClientListener;
+
+ /**
+ * Constructor.
+ *
+ * @param clusterService Cluster service.
+ * @param logicalTopologyService Logical topology service.
+ * @param raftMessagesFactory Raft messages factory.
+ * @param eventsClientListener Raft events client listener.
+ */
+ public TopologyAwareRaftGroupServiceFactory(
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ RaftMessagesFactory raftMessagesFactory,
+ RaftGroupEventsClientListener eventsClientListener
+ ) {
+ this.clusterService = clusterService;
+ this.logicalTopologyService = logicalTopologyService;
+ this.raftMessagesFactory = raftMessagesFactory;
+ this.eventsClientListener = eventsClientListener;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<TopologyAwareRaftGroupService> startRaftGroupService(
+ ReplicationGroupId groupId,
+ PeersAndLearners peersAndLearners,
+ RaftConfiguration raftConfiguration,
+ ScheduledExecutorService raftClientExecutor
+ ) {
+ return TopologyAwareRaftGroupService.start(
+ groupId,
+ clusterService,
+ raftMessagesFactory,
+ raftConfiguration,
+ peersAndLearners,
+ true,
+ raftClientExecutor,
+ logicalTopologyService,
+ eventsClientListener,
+ true
+ ).thenApply(TopologyAwareRaftGroupService.class::cast);
+ }
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index e646c3760e..df11a77ba3 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -17,33 +17,87 @@
package org.apache.ignite.internal.replicator;
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;
+
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkMessage;
/**
* Replica server.
*/
public class Replica {
+ /** The logger. */
+ private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
+
+ private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
/** Replica group identity, this id is the same as the considered partition's id. */
private final ReplicationGroupId replicaGrpId;
/** Replica listener. */
private final ReplicaListener listener;
+ /** Storage index tracker. */
+ private final PendingComparableValuesTracker<Long> storageIndexTracker;
+
+ /** Topology aware Raft client. */
+ private final TopologyAwareRaftGroupService raftClient;
+
+ /** Instance of the local node. */
+ private final ClusterNode localNode;
+
+ // TODO IGNITE-18960 after replica inoperability logic is introduced, this future should be replaced with something like
+ // VersionedValue (so that PlacementDriverMessages would wait for new leader election)
+ private CompletableFuture<AtomicReference<ClusterNode>> leaderFuture = new CompletableFuture<>();
+
+ private AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+
+ /** Latest lease expiration time. */
+ private volatile HybridTimestamp leaseExpirationTime = null;
+
/**
* The constructor of a replica server.
*
* @param replicaGrpId Replication group id.
* @param listener Replica listener.
+ * @param storageIndexTracker Storage index tracker.
+ * @param raftClient Topology aware Raft client.
+ * @param localNode Instance of the local node.
*/
public Replica(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener
+ ReplicaListener listener,
+ PendingComparableValuesTracker<Long> storageIndexTracker,
+ TopologyAwareRaftGroupService raftClient,
+ ClusterNode localNode
) {
this.replicaGrpId = replicaGrpId;
this.listener = listener;
+ this.storageIndexTracker = storageIndexTracker;
+ this.raftClient = raftClient;
+ this.localNode = localNode;
+
+ raftClient.subscribeLeader(this::onLeaderElected);
}
/**
@@ -69,4 +123,122 @@ public class Replica {
public ReplicationGroupId groupId() {
return replicaGrpId;
}
+
+ private void onLeaderElected(ClusterNode clusterNode, Long term) {
+ leaderRef.set(clusterNode);
+
+ if (!leaderFuture.isDone()) {
+ leaderFuture.complete(leaderRef);
+ }
+ }
+
+ private CompletableFuture<ClusterNode> leaderFuture() {
+ return leaderFuture.thenApply(AtomicReference::get);
+ }
+
+ /**
+ * Process placement driver message.
+ *
+ * @param msg Message to process.
+ * @return Future that contains a result.
+ */
+ public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
+ if (msg instanceof LeaseGrantedMessage) {
+ return processLeaseGrantedMessage((LeaseGrantedMessage) msg);
+ }
+
+ return failedFuture(new AssertionError("Unknown message type, msg=" + msg));
+ }
+
+ /**
+ * Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance,
+ * initiates the leadership transfer, if this replica is not a group leader.
+ *
+ * @param msg Message to process.
+ * @return Future that contains a result.
+ */
+ public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+ LOG.info("Received LeaseGrantedMessage for replica belonging to group=" + groupId() + ", force=" + msg.force());
+
+ return leaderFuture().thenCompose(leader -> {
+ HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
+
+ if (leaseExpirationTime != null) {
+ assert msg.leaseExpirationTime().after(leaseExpirationTime) : "Invalid lease expiration time in message, msg=" + msg;
+ }
+
+ if (msg.force()) {
+ // Replica must wait till storage index reaches the current leader's index to make sure that all updates made on the
+ // group leader are received.
+
+ return waitForActualState(msg.leaseExpirationTime().getPhysical())
+ .thenCompose(v -> {
+ CompletableFuture<LeaseGrantedMessageResponse> respFut =
+ acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime());
+
+ if (leader.equals(localNode)) {
+ return respFut;
+ } else {
+ return raftClient.transferLeadership(new Peer(localNode.name()))
+ .thenCompose(ignored -> respFut);
+ }
+ });
+ } else {
+ if (leader.equals(localNode)) {
+ return waitForActualState(msg.leaseExpirationTime().getPhysical())
+ .thenCompose(v -> acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
+ } else {
+ return proposeLeaseRedirect(leader);
+ }
+ }
+ });
+ }
+
+ private CompletableFuture<LeaseGrantedMessageResponse> acceptLease(
+ HybridTimestamp leaseStartTime,
+ HybridTimestamp leaseExpirationTime
+ ) {
+ LOG.info("Lease accepted, group=" + groupId() + ", leaseStartTime=" + leaseStartTime + ", leaseExpirationTime="
+ + leaseExpirationTime);
+
+ this.leaseExpirationTime = leaseExpirationTime;
+
+ LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
+ .accepted(true)
+ .build();
+
+ return completedFuture(resp);
+ }
+
+ private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(ClusterNode groupLeader) {
+ LOG.info("Proposing lease redirection, proposed node=" + groupLeader);
+
+ LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
+ .accepted(false)
+ .redirectProposal(groupLeader.name())
+ .build();
+
+ return completedFuture(resp);
+ }
+
+ /**
+ * Tries to read index from group leader and wait for this index to appear in local storage. Can possible return failed future with
+ * timeout exception, and in this case, replica would not answer to placement driver, because the response is useless. Placement driver
+ * should handle this.
+ *
+ * @param expirationTime Lease expiration time.
+ * @return Future that is completed when local storage catches up the index that is actual for leader on the moment of request.
+ */
+ private CompletableFuture<Void> waitForActualState(long expirationTime) {
+ LOG.info("Waiting for actual storage state, group=" + groupId());
+
+ long timeout = expirationTime - currentTimeMillis();
+ if (timeout <= 0) {
+ return failedFuture(new TimeoutException());
+ }
+
+ return retryOperationUntilSuccess(raftClient::readIndex, e -> currentTimeMillis() > expirationTime, Runnable::run)
+ .orTimeout(timeout, TimeUnit.MILLISECONDS)
+ .thenCompose(storageIndexTracker::waitFor);
+ }
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index bcbaff0680..9a72bcea3a 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.replicator;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.Set;
@@ -31,6 +32,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -43,8 +47,10 @@ import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
@@ -79,6 +85,9 @@ public class ReplicaManager implements IgniteComponent {
/** Replica message handler. */
private final NetworkMessageHandler handler;
+ /** Message handler for placement driver messages. */
+ private final NetworkMessageHandler placementDriverMessageHandler;
+
/** Replicas. */
private final ConcurrentHashMap<ReplicationGroupId, CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
@@ -93,7 +102,7 @@ public class ReplicaManager implements IgniteComponent {
private final Set<Class<?>> messageGroupsToHandle;
/**
- * Constructor for a replica service.
+ * Constructor for a replica service.
*
* @param clusterNetSvc Cluster network service.
* @param clock A hybrid logical clock.
@@ -106,80 +115,111 @@ public class ReplicaManager implements IgniteComponent {
this.clusterNetSvc = clusterNetSvc;
this.clock = clock;
this.messageGroupsToHandle = messageGroupsToHandle;
- this.handler = (message, senderConsistentId, correlationId) -> {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ this.handler = this::onReplicaMessageReceived;
+ this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived;
+ }
+
+ private void onReplicaMessageReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ if (!(message instanceof ReplicaRequest)) {
+ return;
}
- try {
- if (!(message instanceof ReplicaRequest)) {
- return;
- }
+ ReplicaRequest request = (ReplicaRequest) message;
- ReplicaRequest request = (ReplicaRequest) message;
+ // Notify the sender that the Replica is created and ready to process requests.
+ if (request instanceof AwaitReplicaRequest) {
+ replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> {
+ if (replicaFut == null) {
+ replicaFut = new CompletableFuture<>();
+ }
- // Notify the sender that the Replica is created and ready to process requests.
- if (request instanceof AwaitReplicaRequest) {
- replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> {
- if (replicaFut == null) {
- replicaFut = new CompletableFuture<>();
- }
+ if (!replicaFut.isDone()) {
+ replicaFut.thenCompose(
+ ignore -> {
+ IgniteUtils.inBusyLock(
+ busyLock,
+ () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
+ );
- if (!replicaFut.isDone()) {
- replicaFut.thenCompose(
- ignore -> {
- IgniteUtils.inBusyLock(
- busyLock,
- () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
- );
+ return null;
+ }
+ );
- return null;
- }
- );
+ return replicaFut;
+ } else {
+ IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
- return replicaFut;
- } else {
- IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
+ return replicaFut;
+ }
+ });
- return replicaFut;
- }
- });
+ return;
+ }
- return;
- }
+ CompletableFuture<Replica> replicaFut = replicas.get(request.groupId());
+
+ HybridTimestamp requestTimestamp = extractTimestamp(request);
+
+ if (replicaFut == null || !replicaFut.isDone()) {
+ sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request.groupId(), requestTimestamp);
+
+ return;
+ }
- CompletableFuture<Replica> replicaFut = replicas.get(request.groupId());
+ // replicaFut is always completed here.
+ CompletableFuture<?> result = replicaFut.join().processRequest(request);
- HybridTimestamp requestTimestamp = extractTimestamp(request);
+ result.handle((res, ex) -> {
+ NetworkMessage msg;
- if (replicaFut == null || !replicaFut.isDone()) {
- sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request, requestTimestamp);
+ if (ex == null) {
+ msg = prepareReplicaResponse(requestTimestamp, res);
+ } else {
+ LOG.warn("Failed to process replica request [request={}]", ex, request);
- return;
+ msg = prepareReplicaErrorResponse(requestTimestamp, ex);
}
- // replicaFut is always completed here.
- CompletableFuture<?> result = replicaFut.join().processRequest(request);
+ clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
- result.handle((res, ex) -> {
- NetworkMessage msg;
+ return null;
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
- if (ex == null) {
- msg = prepareReplicaResponse(requestTimestamp, res);
- } else {
- LOG.warn("Failed to process replica request [request={}]", ex, request);
+ private void onPlacementDriverMessageReceived(NetworkMessage msg0, String senderConsistentId, @Nullable Long correlationId) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
- msg = prepareReplicaErrorResponse(requestTimestamp, ex);
- }
+ try {
+ assert msg0 instanceof PlacementDriverReplicaMessage : "Unexpected message type, msg=" + msg0;
- clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
+ PlacementDriverReplicaMessage msg = (PlacementDriverReplicaMessage) msg0;
- return null;
- });
- } finally {
- busyLock.leaveBusy();
- }
- };
+ CompletableFuture<Replica> replicaFut = replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
+
+ replicaFut
+ .thenCompose(replica -> replica.processPlacementDriverMessage(msg))
+ .handle((response, ex) -> {
+ if (ex == null) {
+ clusterNetSvc.messagingService().respond(senderConsistentId, response, correlationId);
+ } else {
+ LOG.error("Failed to process placement driver message [msg={}]", ex, msg);
+ }
+
+ return null;
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -187,20 +227,25 @@ public class ReplicaManager implements IgniteComponent {
*
* @param replicaGrpId Replication group id.
* @param listener Replica listener.
+ * @param raftClient Topology aware Raft client.
+ * @param storageIndexTracker Storage index tracker.
+ *
* @return New replica.
* @throws NodeStoppingException If node is stopping.
* @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started.
*/
public Replica startReplica(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener
+ ReplicaListener listener,
+ TopologyAwareRaftGroupService raftClient,
+ PendingComparableValuesTracker<Long> storageIndexTracker
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return startReplicaInternal(replicaGrpId, listener);
+ return startReplicaInternal(replicaGrpId, listener, raftClient, storageIndexTracker);
} finally {
busyLock.leaveBusy();
}
@@ -211,20 +256,25 @@ public class ReplicaManager implements IgniteComponent {
*
* @param replicaGrpId Replication group id.
* @param listener Replica listener.
+ * @param raftClient Topology aware Raft client.
+ * @param storageIndexTracker Storage index tracker.
* @return New replica.
*/
private Replica startReplicaInternal(
ReplicationGroupId replicaGrpId,
- ReplicaListener listener
+ ReplicaListener listener,
+ TopologyAwareRaftGroupService raftClient,
+ PendingComparableValuesTracker<Long> storageIndexTracker
) {
- Replica newReplica = new Replica(replicaGrpId, listener);
+ ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+ Replica newReplica = new Replica(replicaGrpId, listener, storageIndexTracker, raftClient, localNode);
replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
if (replicaFut == null) {
- return CompletableFuture.completedFuture(newReplica);
+ return completedFuture(newReplica);
} else {
if (replicaFut.isDone() && !replicaFut.isCancelled() && !replicaFut.isCompletedExceptionally()) {
- return CompletableFuture.completedFuture(newReplica);
+ return completedFuture(newReplica);
}
replicaFut.complete(newReplica);
@@ -269,6 +319,7 @@ public class ReplicaManager implements IgniteComponent {
@Override
public void start() {
clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler);
+ clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, placementDriverMessageHandler);
messageGroupsToHandle.forEach(mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler));
scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(
this::idleSafeTimeSync,
@@ -309,8 +360,8 @@ public class ReplicaManager implements IgniteComponent {
private void sendReplicaUnavailableErrorResponse(
String senderConsistentId,
@Nullable Long correlationId,
- ReplicaRequest request,
- HybridTimestamp requestTimestamp
+ ReplicationGroupId groupId,
+ @Nullable HybridTimestamp requestTimestamp
) {
if (requestTimestamp != null) {
clusterNetSvc.messagingService().respond(
@@ -319,7 +370,7 @@ public class ReplicaManager implements IgniteComponent {
.errorTimestampAwareReplicaResponse()
.throwable(
new ReplicaUnavailableException(
- request.groupId(),
+ groupId,
clusterNetSvc.topologyService().localMember())
)
.timestamp(clock.update(requestTimestamp))
@@ -332,7 +383,7 @@ public class ReplicaManager implements IgniteComponent {
.errorReplicaResponse()
.throwable(
new ReplicaUnavailableException(
- request.groupId(),
+ groupId,
clusterNetSvc.topologyService().localMember())
)
.build(),
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
similarity index 84%
rename from modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
rename to modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index 52bda8200a..b41e5e6485 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.client;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest.TestReplicationGroup.GROUP_ID;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -68,7 +69,6 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.CliRequests.LeaderChangeNotification;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -85,10 +85,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
private static final int PORT_BASE = 1234;
@InjectConfiguration
- protected RaftConfiguration raftConfiguration;
+ private RaftConfiguration raftConfiguration;
/** RPC executor. */
- protected ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
+ private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
@Test
public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception {
@@ -113,8 +113,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
assertNotNull(leader);
- afterInitCheckConditionWithWait(leader.name());
-
stopCluster(clusterServices, raftServers, List.of(raftClient), nodes);
}
@@ -173,14 +171,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
log.info("Leader: " + leader);
- afterInitCheckConditionWithWait(leader.name());
-
var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port()));
raftServiceToStop.stopRaftNodes(GROUP_ID);
raftServiceToStop.stop();
- afterNodeStop(leader.name());
-
clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stop();
assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000));
@@ -188,8 +182,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
log.info("New Leader: " + leaderRef.get());
- afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
-
raftClientNoInitialNotify.refreshLeader().get();
assertEquals(raftClientNoInitialNotify.leader().consistentId(), leaderRef.get().name());
@@ -254,8 +246,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
log.info("Leader: " + leader);
- afterInitCheckConditionWithWait(leader.name());
-
Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get();
log.info("Peer to transfer leader: " + newLeaderPeer);
@@ -271,8 +261,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
log.info("New Leader: " + leaderRef.get());
- afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
-
raftClient.refreshLeader().get();
assertEquals(raftClient.leader().consistentId(), leaderRef.get().name());
@@ -359,9 +347,11 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
var localPeer = peersAndLearners.peers().stream()
.filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
+ var dataPath = workDir.resolve("raft_" + localPeer.consistentId());
+
var raftServer = new JraftServerImpl(
cluster,
- workDir.resolve("raft_" + localPeer.consistentId()),
+ dataPath,
new NodeOptions(),
eventsClientListener
);
@@ -375,8 +365,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
);
raftServers.put(addr, raftServer);
-
- afterNodeStart(localPeer.consistentId(), cluster, placementDriverNodesNames, eventsClientListener);
}
if (addr.port() == clientPort) {
@@ -449,77 +437,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
return addresses;
}
- @AfterEach
- protected void afterTest() throws Exception {
- // No-op.
- }
-
- /**
- * The method is called after every node of the cluster starts.
- *
- * @param nodeName Node name.
- * @param clusterService Cluster service.
- * @param placementDriverNodesNames Names of all nodes in raft group.
- * @param eventsClientListener Raft events listener for client.
- */
- protected void afterNodeStart(
- String nodeName,
- ClusterService clusterService,
- Set<String> placementDriverNodesNames,
- RaftGroupEventsClientListener eventsClientListener
- ) {
- // No-op.
- }
-
- /**
- * The method is called after every node of the cluster stops.
- *
- * @param nodeName Node name.
- */
- protected void afterNodeStop(String nodeName) {
- // No-op.
- }
-
- /**
- * Checks the condition after cluster and raft clients initialization, waiting for this condition.
- *
- * @param leaderName Current leader name.
- * @throws InterruptedException If failed.
- */
- private void afterInitCheckConditionWithWait(String leaderName) throws InterruptedException {
- assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 10_000));
- }
-
- /**
- * Checks the condition after cluster and raft clients initialization.
- *
- * @param leaderName Current leader name.
- * @return Condition result.
- */
- protected boolean afterInitCheckCondition(String leaderName) {
- return true;
- }
-
- /**
- * Checks the condition after leader change, waiting for this condition.
- *
- * @param leaderName Current leader name.
- * @throws InterruptedException If failed.
- */
- private void afterLeaderChangeCheckConditionWithWait(String leaderName) throws InterruptedException {
- assertTrue(waitForCondition(() -> afterLeaderChangeCheckCondition(leaderName), 10_000));
- }
-
- /**
- * Checks the condition after leader change.
- *
- * @param leaderName Current leader name.
- * @return Condition result.
- */
- protected boolean afterLeaderChangeCheckCondition(String leaderName) {
- return true;
- }
-
/**
* Replication test group class.
*/
@@ -563,7 +480,7 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
/**
* Test implementation of {@link LogicalTopologyService}.
*/
- protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
+ private static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
private final ClusterService clusterService;
public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
@@ -582,12 +499,12 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
@Override
public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
- return CompletableFuture.completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+ return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
}
@Override
public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
- return CompletableFuture.completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+ return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
}
}
}
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
new file mode 100644
index 0000000000..da4e7a5b11
--- /dev/null
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for placement driver messages processing on replica side.
+ */
+public class PlacementDriverReplicaSideTest {
+ private static final ReplicationGroupId GRP_ID = new ReplicationGroupId() {
+ };
+
+ private static final ClusterNode LOCAL_NODE = new ClusterNode("id0", "name0", new NetworkAddress("localhost", 1234));
+ private static final ClusterNode ANOTHER_NODE = new ClusterNode("id1", "name`", new NetworkAddress("localhost", 2345));
+
+ private static final PlacementDriverMessagesFactory MSG_FACTORY = new PlacementDriverMessagesFactory();
+
+ private Replica replica;
+
+ private AtomicReference<BiConsumer<ClusterNode, Long>> callbackHolder = new AtomicReference<>();
+
+ private PendingComparableValuesTracker<Long> storageIndexTracker;
+
+ private AtomicLong indexOnLeader = new AtomicLong(0);
+
+ private Peer currentLeader = null;
+
+ private int countOfTimeoutExceptionsOnReadIndexToThrow = 0;
+
+ private Replica startReplica() {
+ TopologyAwareRaftGroupService raftClient = mock(TopologyAwareRaftGroupService.class);
+
+ when(raftClient.subscribeLeader(any())).thenAnswer(invocationOnMock -> {
+ BiConsumer<ClusterNode, Long> callback = invocationOnMock.getArgument(0);
+ callbackHolder.set(callback);
+
+ return completedFuture(null);
+ });
+
+ when(raftClient.transferLeadership(any())).thenAnswer(invocationOnMock -> {
+ Peer peer = invocationOnMock.getArgument(0);
+ currentLeader = peer;
+
+ return completedFuture(null);
+ });
+
+ when(raftClient.readIndex()).thenAnswer(invocationOnMock -> {
+ if (countOfTimeoutExceptionsOnReadIndexToThrow > 0) {
+ countOfTimeoutExceptionsOnReadIndexToThrow--;
+ return failedFuture(new TimeoutException());
+ } else {
+ return completedFuture(indexOnLeader.get());
+ }
+ });
+
+ Replica replica = new Replica(
+ GRP_ID,
+ mock(ReplicaListener.class),
+ storageIndexTracker,
+ raftClient,
+ LOCAL_NODE
+ );
+
+ return replica;
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+ indexOnLeader.set(1L);
+ currentLeader = null;
+ countOfTimeoutExceptionsOnReadIndexToThrow = 0;
+ replica = startReplica();
+ }
+
+ /**
+ * Imitates leader election for the group.
+ *
+ * @param leader The leader.
+ */
+ private void leaderElection(ClusterNode leader) {
+ if (callbackHolder.get() != null) {
+ callbackHolder.get().accept(leader, 1L);
+ }
+ }
+
+ /**
+ * Imitates sending {@link org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage} to the replica.
+ *
+ * @param leaseStartTime Lease start time.
+ * @param leaseExpirationTime Lease expiration time.
+ * @param force Force flag.
+ * @return Future that is completed when replica sends a response.
+ */
+ private CompletableFuture<LeaseGrantedMessageResponse> sendLeaseGranted(
+ HybridTimestamp leaseStartTime,
+ HybridTimestamp leaseExpirationTime,
+ boolean force
+ ) {
+ PlacementDriverReplicaMessage msg = MSG_FACTORY.leaseGrantedMessage()
+ .leaseStartTime(leaseStartTime)
+ .leaseExpirationTime(leaseExpirationTime)
+ .force(force)
+ .build();
+
+ return replica.processPlacementDriverMessage(msg).thenApply(LeaseGrantedMessageResponse.class::cast);
+ }
+
+ private HybridTimestamp hts(long physical) {
+ return new HybridTimestamp(currentTimeMillis() + physical * 1000, 0);
+ }
+
+ private void updateIndex(long index) {
+ storageIndexTracker.update(index);
+ }
+
+ @Test
+ public void replicationGroupReadinessAwait() {
+ updateIndex(1L);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+ assertFalse(respFut.isDone());
+ leaderElection(LOCAL_NODE);
+ assertTrue(respFut.isDone());
+ }
+
+ @Test
+ public void replicationGroupReadinessAwaitAnotherNodeLeader() {
+ CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+ assertFalse(respFut.isDone());
+ leaderElection(ANOTHER_NODE);
+ assertTrue(respFut.isDone());
+ }
+
+ @Test
+ public void testGrantLeaseToLeader() {
+ leaderElection(LOCAL_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+
+ updateIndex(1L);
+ assertTrue(respFut.isDone());
+
+ LeaseGrantedMessageResponse resp = respFut.join();
+ assertTrue(resp.accepted());
+ assertNull(resp.redirectProposal());
+ }
+
+ @Test
+ public void testGrantLeaseToNonLeader() {
+ leaderElection(ANOTHER_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+
+ assertTrue(respFut.isDone());
+
+ LeaseGrantedMessageResponse resp = respFut.join();
+ assertFalse(resp.accepted());
+ assertEquals(ANOTHER_NODE.name(), resp.redirectProposal());
+ }
+
+ @Test
+ public void testGrantLeaseRepeat() {
+ long leaseStartTime = 10;
+ leaderElection(ANOTHER_NODE);
+ // Sending message with force == true.
+ CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+
+ updateIndex(1L);
+ assertTrue(respFut0.isDone());
+
+ LeaseGrantedMessageResponse resp0 = respFut0.join();
+ assertTrue(resp0.accepted());
+ assertNull(resp0.redirectProposal());
+
+ // Sending the same message once again, with force == false (placement driver actor may have changed and the new lease interval
+ // intersects with previous one).
+ CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+ sendLeaseGranted(hts(leaseStartTime + 8), hts(leaseStartTime + 18), false);
+
+ assertTrue(respFut1.isDone());
+
+ LeaseGrantedMessageResponse resp1 = respFut1.join();
+ assertFalse(resp1.accepted());
+ assertEquals(ANOTHER_NODE.name(), resp1.redirectProposal());
+ }
+
+ @Test
+ public void testGrantLeaseToNodeWithExpiredLease() {
+ long leaseStartTime = 10;
+ updateIndex(1L);
+ leaderElection(LOCAL_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+ assertTrue(respFut0.isDone());
+
+ LeaseGrantedMessageResponse resp0 = respFut0.join();
+ assertTrue(resp0.accepted());
+ assertNull(resp0.redirectProposal());
+
+ CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+ sendLeaseGranted(hts(leaseStartTime + 11), hts(leaseStartTime + 20), false);
+ assertTrue(respFut1.isDone());
+
+ LeaseGrantedMessageResponse resp1 = respFut1.join();
+ assertTrue(resp1.accepted());
+ assertNull(resp1.redirectProposal());
+ }
+
+ @Test
+ public void testGrantLeaseToNodeWithExpiredLeaseAndAnotherLeaderElected() {
+ long leaseStartTime = 10;
+ updateIndex(1L);
+ leaderElection(LOCAL_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+ assertTrue(respFut0.isDone());
+
+ LeaseGrantedMessageResponse resp0 = respFut0.join();
+ assertTrue(resp0.accepted());
+ assertNull(resp0.redirectProposal());
+
+ leaderElection(ANOTHER_NODE);
+
+ CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+ sendLeaseGranted(hts(leaseStartTime + 11), hts(leaseStartTime + 20), false);
+ assertTrue(respFut1.isDone());
+
+ LeaseGrantedMessageResponse resp1 = respFut1.join();
+ assertFalse(resp1.accepted());
+ assertEquals(ANOTHER_NODE.name(), resp1.redirectProposal());
+ }
+
+ @Test
+ public void testForce() {
+ long leaseStartTime = 10;
+ leaderElection(ANOTHER_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+
+ assertFalse(respFut.isDone());
+
+ updateIndex(1L);
+ assertTrue(respFut.isDone());
+
+ LeaseGrantedMessageResponse resp = respFut.join();
+ assertTrue(resp.accepted());
+ assertNull(resp.redirectProposal());
+
+ // Replica should initiate the leadership transfer.
+ assertEquals(LOCAL_NODE.name(), currentLeader.consistentId());
+ }
+
+ @Test
+ public void testForceToActualLeader() {
+ long leaseStartTime = 10;
+
+ leaderElection(ANOTHER_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+ assertTrue(respFut0.isDone());
+
+ LeaseGrantedMessageResponse resp0 = respFut0.join();
+ assertFalse(resp0.accepted());
+ assertEquals(ANOTHER_NODE.name(), resp0.redirectProposal());
+
+ // After declining the lease grant, local node is elected as a leader and new message with force == true is sent to this
+ // node as actual leader.
+ leaderElection(LOCAL_NODE);
+
+ updateIndex(1L);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut1 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+ assertTrue(respFut1.isDone());
+
+ LeaseGrantedMessageResponse resp1 = respFut1.join();
+
+ assertTrue(resp1.accepted());
+ assertNull(resp1.redirectProposal());
+ }
+
+ @Test
+ public void testIncorrectMessageToReplica() {
+ CompletableFuture<?> future = replica.processPlacementDriverMessage(MSG_FACTORY.leaseGrantedMessageResponse().build());
+ assertTrue(future.isDone());
+ assertTrue(future.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testLongReadIndexWait() {
+ countOfTimeoutExceptionsOnReadIndexToThrow = 100;
+ updateIndex(1L);
+ leaderElection(LOCAL_NODE);
+ CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(1), hts(10), false);
+ // Actually, it completes faster because TimeoutException is thrown from mock instantly.
+ assertThat(respFut0, willSucceedIn(5, TimeUnit.SECONDS));
+ assertEquals(0, countOfTimeoutExceptionsOnReadIndexToThrow);
+ }
+}
diff --git a/modules/rest-api/openapi/openapi.yaml b/modules/rest-api/openapi/openapi.yaml
index 53cd30ff45..727589e27c 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -327,7 +327,7 @@ paths:
parameters: []
responses:
"200":
- description: All statuses returned successfully.
+ description: All statutes returned successfully.
content:
application/json:
schema:
@@ -402,7 +402,7 @@ paths:
type: string
responses:
"200":
- description: All statuses returned successfully.
+ description: All statutes returned successfully.
content:
application/json:
schema:
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 26ea20da90..a5552a8972 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -122,6 +122,7 @@ dependencies {
integrationTestImplementation project(':ignite-jdbc')
integrationTestImplementation project(':ignite-security')
integrationTestImplementation project(':ignite-catalog')
+ integrationTestImplementation project(':ignite-placement-driver')
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a5ffa5a198..8d03cfd3d9 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManage
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.NodeBootstrapConfiguration;
import org.apache.ignite.internal.configuration.SecurityConfiguration;
@@ -92,6 +93,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAl
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
@@ -148,6 +150,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.table.Table;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.Nullable;
@@ -729,6 +732,14 @@ public class ItRebalanceDistributedTest {
schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
+ LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterService,
+ logicalTopologyService,
+ Loza.FACTORY,
+ new RaftGroupEventsClientListener()
+ );
+
tableManager = new TableManager(
name,
registry,
@@ -747,7 +758,8 @@ public class ItRebalanceDistributedTest {
schemaManager,
view -> new LocalLogStorageFactory(),
new HybridClockImpl(),
- new OutgoingSnapshotsManager(clusterService.messagingService())
+ new OutgoingSnapshotsManager(clusterService.messagingService()),
+ topologyAwareRaftGroupServiceFactory
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(TableConfiguration tableCfg) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ace3896bf4..32345e60a7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
@@ -113,6 +114,7 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
@@ -333,6 +335,13 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterSvc,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+ Loza.FACTORY,
+ new RaftGroupEventsClientListener()
+ );
+
TableManager tableManager = new TableManager(
name,
registry,
@@ -351,7 +360,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
schemaManager,
view -> new LocalLogStorageFactory(),
hybridClock,
- new OutgoingSnapshotsManager(clusterSvc.messagingService())
+ new OutgoingSnapshotsManager(clusterSvc.messagingService()),
+ topologyAwareRaftGroupServiceFactory
);
var indexManager = new IndexManager(tblCfg, schemaManager, tableManager);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 004ec601da..52a494ac92 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.app;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
-
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -32,9 +29,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -93,6 +87,7 @@ import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
@@ -126,7 +121,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -277,8 +271,6 @@ public class IgniteImpl implements Ignite {
private final RestAddressReporter restAddressReporter;
- private final ScheduledExecutorService raftExecutorService;
-
private final DistributedConfigurationUpdater distributedConfigurationUpdater;
private final CatalogManager catalogManager;
@@ -341,12 +333,6 @@ public class IgniteImpl implements Ignite {
RaftConfiguration raftConfiguration = nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
- raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
- new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterSvc.localConfiguration().getName(),
- CLIENT_POOL_NAME), LOG
- )
- );
-
// TODO https://issues.apache.org/jira/browse/IGNITE-19051
RaftGroupEventsClientListener raftGroupEventsClientListener = new RaftGroupEventsClientListener();
@@ -355,7 +341,6 @@ public class IgniteImpl implements Ignite {
raftConfiguration,
workDir,
clock,
- raftExecutorService,
raftGroupEventsClientListener
);
@@ -412,6 +397,13 @@ public class IgniteImpl implements Ignite {
TablesConfiguration tablesConfiguration = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterSvc,
+ logicalTopologyService,
+ Loza.FACTORY,
+ raftGroupEventsClientListener
+ );
+
// TODO: IGNITE-16985 Design table management flow
// placementDriverMgr = new PlacementDriverManager(
// metaStorageMgr,
@@ -491,7 +483,8 @@ public class IgniteImpl implements Ignite {
schemaManager,
volatileLogStorageFactoryCreator,
clock,
- outgoingSnapshotsManager
+ outgoingSnapshotsManager,
+ topologyAwareRaftGroupServiceFactory
);
indexManager = new IndexManager(tablesConfiguration, schemaManager, distributedTblMgr);
@@ -761,7 +754,6 @@ public class IgniteImpl implements Ignite {
public void stop() {
lifecycleManager.stopNode();
restAddressReporter.removeReport();
- IgniteUtils.shutdownAndAwaitTermination(raftExecutorService, 10, TimeUnit.SECONDS);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index e8c5d5b257..8a80bb3ade 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -72,6 +72,7 @@ dependencies {
testImplementation project(':ignite-storage-rocksdb')
testImplementation project(':ignite-cluster-management')
testImplementation project(':ignite-vault')
+ testImplementation project(':ignite-placement-driver')
testImplementation libs.jmh.core
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 4d87d47f24..e0a6bd511f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -573,7 +574,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
schemaManager,
null,
clock,
- mock(OutgoingSnapshotsManager.class)
+ mock(OutgoingSnapshotsManager.class),
+ mock(TopologyAwareRaftGroupServiceFactory.class)
);
tableManager.start();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 05d68dcc4e..5d35035cc1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -380,7 +380,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
partitionDataStorage,
storageUpdateHandler,
new TestTxStateStorage(),
- new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
+ new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)),
+ new PendingComparableValuesTracker<>(0L)
);
paths.put(listener, path);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 6680ef65cb..37643df468 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -17,6 +17,7 @@
package org.apache.ignite.distributed;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -44,6 +45,9 @@ import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -106,6 +111,7 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
@@ -156,6 +162,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
protected Map<String, TxStateStorage> txStateStorages;
+ private Map<String, ClusterService> clusterServices;
+
protected final List<ClusterService> cluster = new CopyOnWriteArrayList<>();
private ScheduledThreadPoolExecutor executor;
@@ -227,9 +235,14 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
var nodeFinder = new StaticNodeFinder(localAddresses);
+ clusterServices = new HashMap<>(nodes);
+
nodeFinder.findNodes().parallelStream()
- .map(addr -> startNode(testInfo, addr.toString(), addr.port(), nodeFinder))
- .forEach(cluster::add);
+ .forEach(addr -> {
+ ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
+ cluster.add(svc);
+ clusterServices.put(svc.topologyService().localMember().name(), svc);
+ });
for (ClusterService node : cluster) {
assertTrue(waitForTopology(node, nodes, 1000));
@@ -424,11 +437,19 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
PendingComparableValuesTracker<HybridTimestamp> safeTime =
new PendingComparableValuesTracker<>(clocks.get(assignment).now());
+ PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMpPartStorage);
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> Map.of(pkStorage.get().id(), pkStorage.get());
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
+ TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+ clusterServices.get(assignment),
+ logicalTopologyService(clusterServices.get(assignment)),
+ Loza.FACTORY,
+ new RaftGroupEventsClientListener()
+ );
+
CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode(
new RaftNodeId(grpId, configuration.peer(assignment)),
configuration,
@@ -436,9 +457,11 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
partitionDataStorage,
storageUpdateHandler,
txStateStorage,
- safeTime
+ safeTime,
+ storageIndexTracker
),
- RaftGroupEventsListener.noopLsnr
+ RaftGroupEventsListener.noopLsnr,
+ topologyAwareRaftGroupServiceFactory
).thenAccept(
raftSvc -> {
try {
@@ -462,8 +485,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
placementDriver,
storageUpdateHandler,
peer -> assignment.equals(peer.consistentId()),
- CompletableFuture.completedFuture(schemaManager)
- )
+ completedFuture(schemaManager)
+ ),
+ raftSvc,
+ storageIndexTracker
);
} catch (NodeStoppingException e) {
fail("Unexpected node stopping", e);
@@ -512,6 +537,30 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
return clients;
}
+ private LogicalTopologyService logicalTopologyService(ClusterService clusterService) {
+ return new LogicalTopologyService() {
+ @Override
+ public void addEventListener(LogicalTopologyEventListener listener) {
+
+ }
+
+ @Override
+ public void removeEventListener(LogicalTopologyEventListener listener) {
+
+ }
+
+ @Override
+ public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+ return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+ }
+
+ @Override
+ public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
+ return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+ }
+ };
+ }
+
/**
* Returns a raft manager for a group.
*
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index dde0f20c06..83f6dbb59a 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -126,7 +128,12 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
(message, sender, correlationId) -> {
try {
- replicaManager.startReplica(tablePartitionId, request0 -> CompletableFuture.completedFuture(null));
+ replicaManager.startReplica(
+ tablePartitionId,
+ request0 -> CompletableFuture.completedFuture(null),
+ mock(TopologyAwareRaftGroupService.class),
+ new PendingComparableValuesTracker<>(0L)
+ );
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4302d5b154..17fdced54e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -100,6 +100,8 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -278,6 +280,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private final OutgoingSnapshotsManager outgoingSnapshotsManager;
+ private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
+
/** Partitions storage path. */
private final Path storagePath;
@@ -322,6 +326,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param schemaManager Schema manager.
* @param volatileLogStorageFactoryCreator Creator for {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} for
* volatile tables.
+ * @param raftGroupServiceFactory Factory that is used for creation of raft group services for replication groups.
*/
public TableManager(
String nodeName,
@@ -341,7 +346,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
SchemaManager schemaManager,
LogStorageFactoryCreator volatileLogStorageFactoryCreator,
HybridClock clock,
- OutgoingSnapshotsManager outgoingSnapshotsManager
+ OutgoingSnapshotsManager outgoingSnapshotsManager,
+ TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory
) {
this.tablesCfg = tablesCfg;
this.clusterService = clusterService;
@@ -358,6 +364,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
this.clock = clock;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+ this.raftGroupServiceFactory = raftGroupServiceFactory;
clusterNodeResolver = topologyService::getByConsistentId;
@@ -690,6 +697,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+ PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
CompletableFuture<PartitionStorages> partitionStoragesFut = getOrCreatePartitionStorages(table, partId);
@@ -780,7 +788,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionDataStorage,
storageUpdateHandler,
txStatePartitionStorage,
- safeTime
+ safeTime,
+ storageIndexTracker
),
new RebalanceRaftGroupEventsListener(
metaStorageMgr,
@@ -807,7 +816,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
startGroupFut
.thenComposeAsync(v -> inBusyLock(busyLock, () -> {
try {
- return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration);
+ return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory);
} catch (NodeStoppingException ex) {
return failedFuture(ex);
}
@@ -845,7 +854,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
this::isLocalPeer,
schemaManager.schemaRegistry(causalityToken, tblId),
storageReadyLatch
- )
+ ),
+ updatedRaftGroupService,
+ storageIndexTracker
);
} catch (NodeStoppingException ex) {
throw new AssertionError("Loza was stopped before Table manager", ex);
@@ -1974,6 +1985,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.anyMatch(assignment -> !stableAssignments.contains(assignment));
var safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+ PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
InternalTable internalTable = tbl.internalTable();
@@ -2009,7 +2021,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionDataStorage,
storageUpdateHandler,
txStatePartitionStorage,
- safeTime
+ safeTime,
+ storageIndexTracker
);
RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
@@ -2058,7 +2071,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
storageUpdateHandler,
this::isLocalPeer,
completedFuture(schemaManager.schemaRegistry(tblId))
- )
+ ),
+ (TopologyAwareRaftGroupService) internalTable.partitionRaftGroupService(partId),
+ storageIndexTracker
);
} catch (NodeStoppingException ignored) {
// no-op
@@ -2293,7 +2308,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
replicaMgr.stopReplica(tablePartitionId);
} catch (NodeStoppingException e) {
- // no-op
+ // No-op.
}
return tablesByIdVv.get(evt.revision())
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index f9c2fb6bcf..60c9870a29 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -82,22 +82,28 @@ public class PartitionListener implements RaftGroupListener {
/** Safe time tracker. */
private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+ /** Storage index tracker. */
+ private final PendingComparableValuesTracker<Long> storageIndexTracker;
+
/**
* The constructor.
*
* @param partitionDataStorage The storage.
* @param safeTime Safe time tracker.
+ * @param storageIndexTracker Storage index tracker.
*/
public PartitionListener(
PartitionDataStorage partitionDataStorage,
StorageUpdateHandler storageUpdateHandler,
TxStateStorage txStateStorage,
- PendingComparableValuesTracker<HybridTimestamp> safeTime
+ PendingComparableValuesTracker<HybridTimestamp> safeTime,
+ PendingComparableValuesTracker<Long> storageIndexTracker
) {
this.storage = partitionDataStorage;
this.storageUpdateHandler = storageUpdateHandler;
this.txStateStorage = txStateStorage;
this.safeTime = safeTime;
+ this.storageIndexTracker = storageIndexTracker;
// TODO: IGNITE-18502 Implement a pending update storage
try (PartitionTimestampCursor cursor = partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
@@ -179,6 +185,8 @@ public class PartitionListener implements RaftGroupListener {
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
}
+
+ storageIndexTracker.update(commandIndex);
});
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 357b2ab430..0da68a8da7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -387,7 +387,7 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
} else if (request instanceof ReplicaSafeTimeSyncRequest) {
- return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
+ return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -622,9 +622,16 @@ public class PartitionReplicaListener implements ReplicaListener {
* Handler to process {@link ReplicaSafeTimeSyncRequest}.
*
* @param request Request.
+ * @param isPrimary Whether is primary replica.
* @return Future.
*/
- private CompletableFuture<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+ private CompletableFuture<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request, Boolean isPrimary) {
+ requireNonNull(isPrimary);
+
+ if (!isPrimary) {
+ return completedFuture(null);
+ }
+
return raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
}
@@ -2071,7 +2078,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
);
- } else if (request instanceof ReadOnlyReplicaRequest) {
+ } else if (request instanceof ReadOnlyReplicaRequest || request instanceof ReplicaSafeTimeSyncRequest) {
return raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> isLocalPeerChecker.apply(replicaAndTerm.leader()));
} else {
return completedFuture(null);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index 9294016373..5b8396b5d8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.schema.SchemaManager;
@@ -227,7 +228,8 @@ public class TableManagerDistributionZonesTest extends IgniteAbstractTest {
mock(SchemaManager.class),
null,
null,
- mock(OutgoingSnapshotsManager.class)
+ mock(OutgoingSnapshotsManager.class),
+ mock(TopologyAwareRaftGroupServiceFactory.class)
);
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 5d727ad316..10592e3eb5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
@@ -489,7 +490,7 @@ public class TableManagerTest extends IgniteAbstractTest {
TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
- verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
+ verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any());
TableManager tableManager = tblManagerFut.join();
@@ -728,7 +729,8 @@ public class TableManagerTest extends IgniteAbstractTest {
sm = new SchemaManager(revisionUpdater, tblsCfg, msm),
budgetView -> new LocalLogStorageFactory(),
new HybridClockImpl(),
- new OutgoingSnapshotsManager(clusterService.messagingService())
+ new OutgoingSnapshotsManager(clusterService.messagingService()),
+ mock(TopologyAwareRaftGroupServiceFactory.class)
) {
@Override
protected MvTableStorage createTableStorage(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 7861ae3e31..efa846bb69 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -198,7 +198,8 @@ public class PartitionCommandListenerTest {
partitionDataStorage,
storageUpdateHandler,
txStateStorage,
- safeTimeTracker
+ safeTimeTracker,
+ new PendingComparableValuesTracker<>(0L)
);
}
@@ -288,7 +289,8 @@ public class PartitionCommandListenerTest {
partitionDataStorage,
storageUpdateHandler,
txStateStorage,
- new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
+ new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)),
+ new PendingComparableValuesTracker<>(0L)
);
txStateStorage.lastApplied(3L, 1L);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 57ebfe0dba..eb49f6da65 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -298,7 +298,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
new TestPartitionDataStorage(mvPartStorage),
storageUpdateHandler,
txStateStorage().getOrCreateTxStateStorage(PART_ID),
- safeTime
+ safeTime,
+ new PendingComparableValuesTracker<>(0L)
);
safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime), "safe-time-updater");
diff --git a/settings.gradle b/settings.gradle
index f33fd8d336..5cb7e03f00 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -65,6 +65,7 @@ include(':packaging')
include(':ignite-replicator')
include(':ignite-distribution-zones')
include(':ignite-placement-driver')
+include(':ignite-placement-driver-api')
include(':ignite-code-deployment')
include(':ignite-security')
include(':ignite-catalog')
@@ -118,6 +119,7 @@ project(":packaging-db").projectDir = file('packaging/db')
project(":packaging").projectDir = file('packaging')
project(":ignite-distribution-zones").projectDir = file('modules/distribution-zones')
project(":ignite-placement-driver").projectDir = file('modules/placement-driver')
+project(":ignite-placement-driver-api").projectDir = file('modules/placement-driver-api')
project(":ignite-code-deployment").projectDir = file('modules/code-deployment')
project(":ignite-security").projectDir = file('modules/security')
project(":ignite-catalog").projectDir = file('modules/catalog')