You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/03/27 08:06:12 UTC

[ignite-3] 01/01: IGNITE-18744 Implement primary replica side leaseGrant handler (#1765)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit aef992b2fbb7f16343ded9adb6be22e063ec015d
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Mon Mar 27 10:42:09 2023 +0300

    IGNITE-18744 Implement primary replica side leaseGrant handler (#1765)
---
 .../ignite/internal/hlc/HybridTimestamp.java       |  42 ++
 .../apache/ignite/internal/util/IgniteUtils.java   |  50 ++
 .../build.gradle                                   |  11 +-
 .../message/LeaseGrantedMessage.java}              |  30 +-
 .../message/LeaseGrantedMessageResponse.java}      |  24 +-
 .../message/PlacementDriverMessageGroup.java}      |  32 +-
 .../message/PlacementDriverReplicaMessage.java}    |  31 +-
 modules/placement-driver/build.gradle              |   2 +
 .../internal/placementdriver/ActiveActorTest.java  | 592 ++++++++++++++++++++-
 .../PlacementDriverManagerTest.java                |  35 +-
 .../internal/placementdriver/LeaseUpdater.java     |  25 +-
 .../placementdriver/PlacementDriverManager.java    |  74 +--
 .../apache/ignite/internal/raft/RaftManager.java   |  33 ++
 .../ignite/internal/raft/RaftServiceFactory.java   |  45 ++
 .../internal/raft/service/RaftGroupService.java    |   7 +
 .../java/org/apache/ignite/internal/raft/Loza.java |  83 ++-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  13 +
 .../ignite/internal/raft/RaftGroupServiceTest.java |  36 ++
 modules/replicator/build.gradle                    |  14 +-
 .../raft/client/TopologyAwareRaftGroupService.java |  22 +-
 .../TopologyAwareRaftGroupServiceFactory.java      |  84 +++
 .../apache/ignite/internal/replicator/Replica.java | 174 +++++-
 .../ignite/internal/replicator/ReplicaManager.java | 181 ++++---
 .../client/TopologyAwareRaftGroupServiceTest.java  | 101 +---
 .../replicator/PlacementDriverReplicaSideTest.java | 340 ++++++++++++
 modules/rest-api/openapi/openapi.yaml              |   4 +-
 modules/runner/build.gradle                        |   1 +
 .../storage/ItRebalanceDistributedTest.java        |  14 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  12 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  28 +-
 modules/sql-engine/build.gradle                    |   1 +
 .../sql/engine/exec/MockedStructuresTest.java      |   4 +-
 .../ignite/distributed/ItTablePersistenceTest.java |   3 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  61 ++-
 .../ignite/distributed/ReplicaUnavailableTest.java |   9 +-
 .../internal/table/distributed/TableManager.java   |  29 +-
 .../table/distributed/raft/PartitionListener.java  |  10 +-
 .../replicator/PartitionReplicaListener.java       |  13 +-
 .../TableManagerDistributionZonesTest.java         |   4 +-
 .../table/distributed/TableManagerTest.java        |   6 +-
 .../raft/PartitionCommandListenerTest.java         |   6 +-
 .../table/impl/DummyInternalTableImpl.java         |   3 +-
 settings.gradle                                    |   2 +
 43 files changed, 1881 insertions(+), 410 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index d3c8415cc8..e06a975ae4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -34,6 +34,12 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
     /** A constant holding the maximum value a {@code HybridTimestamp} can have. */
     public static final HybridTimestamp MAX_VALUE = new HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);
 
+    /**
+     * Cluster cLock skew. The constant determines the undefined inclusive interval to compares timestamp from various nodes.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
     /** Physical clock. */
     private final long physical;
 
@@ -131,6 +137,42 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
         return result;
     }
 
+    /**
+     * Compares two timestamps with the clock skew.
+     * t1, t2 comparable if t1 is not contained on [t2 - CLOCK_SKEW; t2 + CLOCK_SKEW].
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     *
+     * @param anotherTimestamp Another timestamp.
+     * @return Result of comparison can be positive or negative, or {@code 0} if timestamps are not comparable.
+     */
+    private int compareWithClockSkew(HybridTimestamp anotherTimestamp) {
+        if (getPhysical() - CLOCK_SKEW <= anotherTimestamp.getPhysical() && getPhysical() + CLOCK_SKEW >= anotherTimestamp.getPhysical()) {
+            return 0;
+        }
+
+        return compareTo(anotherTimestamp);
+    }
+
+    /**
+     * Defines whether this timestamp is strictly before the given one, taking the clock skew into account.
+     *
+     * @param anotherTimestamp Another timestamp.
+     * @return Whether this timestamp is before the given one or not.
+     */
+    public boolean before(HybridTimestamp anotherTimestamp) {
+        return compareWithClockSkew(anotherTimestamp) < 0;
+    }
+
+    /**
+     * Defines whether this timestamp is strictly after the given one, taking the clock skew into account.
+     *
+     * @param anotherTimestamp Another timestamp.
+     * @return Whether this timestamp is after the given one or not.
+     */
+    public boolean after(HybridTimestamp anotherTimestamp) {
+        return compareWithClockSkew(anotherTimestamp) > 0;
+    }
+
     @Override
     public int compareTo(HybridTimestamp other) {
         if (this.physical == other.physical) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index efa3d4f15e..ea16c07854 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -48,10 +48,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
@@ -1005,4 +1007,52 @@ public class IgniteUtils {
 
         return Optional.empty();
     }
+
+    /**
+     * Retries operation until it succeeds or fails with exception that is different than the given.
+     *
+     * @param operation Operation.
+     * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
+     * @param executor Executor to make retry in.
+     * @return Future that is completed when operation is successful or failed with other exception than the given.
+     */
+    public static <T> CompletableFuture<T> retryOperationUntilSuccess(
+            Supplier<CompletableFuture<T>> operation,
+            Function<Throwable, Boolean> stopRetryCondition,
+            Executor executor
+    ) {
+        CompletableFuture<T> fut = new CompletableFuture<>();
+
+        retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor);
+
+        return fut;
+    }
+
+    /**
+     * Retries operation until it succeeds or fails with exception that is different than the given.
+     *
+     * @param operation Operation.
+     * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped.
+     * @param executor Executor to make retry in.
+     * @param fut Future that is completed when operation is successful or failed with other exception than the given.
+     */
+    public static <T> void retryOperationUntilSuccess(
+            Supplier<CompletableFuture<T>> operation,
+            Function<Throwable, Boolean> stopRetryCondition,
+            CompletableFuture<T> fut,
+            Executor executor
+    ) {
+        operation.get()
+                .whenComplete((res, e) -> {
+                    if (e == null) {
+                        fut.complete(res);
+                    } else {
+                        if (stopRetryCondition.apply(e)) {
+                            fut.completeExceptionally(e);
+                        } else {
+                            executor.execute(() -> retryOperationUntilSuccess(operation, stopRetryCondition, fut, executor));
+                        }
+                    }
+                });
+    }
 }
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/build.gradle
similarity index 79%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/build.gradle
index 34b50e0c45..81779696ef 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -19,18 +19,11 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
 apply from: "$rootDir/buildscripts/publishing.gradle"
 apply from: "$rootDir/buildscripts/java-junit5.gradle"
 
-
 dependencies {
     annotationProcessor project(":ignite-network-annotation-processor")
+
     implementation project(':ignite-core')
-    implementation project(':ignite-raft-api')
-    implementation project(':ignite-configuration-api')
     implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
-    implementation libs.fastutil.core
-
-    testImplementation libs.mockito.core
-    testImplementation libs.mockito.junit
 }
 
-description = 'ignite-replicator'
+description = 'ignite-placement-driver-api'
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
index 34b50e0c45..ef009b0001 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
 
-dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
-    implementation project(':ignite-core')
-    implementation project(':ignite-raft-api')
-    implementation project(':ignite-configuration-api')
-    implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
-    implementation libs.fastutil.core
+/**
+ * Lease granted message.
+ */
+@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE)
+public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage {
+    @Marshallable
+    HybridTimestamp leaseStartTime();
 
-    testImplementation libs.mockito.core
-    testImplementation libs.mockito.junit
-}
+    @Marshallable
+    HybridTimestamp leaseExpirationTime();
 
-description = 'ignite-replicator'
+    boolean force();
+}
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
index 34b50e0c45..c6addb77ad 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
 
+import org.apache.ignite.network.annotations.Transferable;
 
-dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
-    implementation project(':ignite-core')
-    implementation project(':ignite-raft-api')
-    implementation project(':ignite-configuration-api')
-    implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
-    implementation libs.fastutil.core
+/**
+ * Response for lease granted message.
+ */
+@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE_RESPONSE)
+public interface LeaseGrantedMessageResponse extends PlacementDriverReplicaMessage {
+    boolean accepted();
 
-    testImplementation libs.mockito.core
-    testImplementation libs.mockito.junit
+    String redirectProposal();
 }
-
-description = 'ignite-replicator'
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
similarity index 53%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
index 34b50e0c45..b73cdfeea1 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
 
+import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_NAME;
+import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_TYPE;
 
-dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
-    implementation project(':ignite-core')
-    implementation project(':ignite-raft-api')
-    implementation project(':ignite-configuration-api')
-    implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
-    implementation libs.fastutil.core
+import org.apache.ignite.network.annotations.MessageGroup;
 
-    testImplementation libs.mockito.core
-    testImplementation libs.mockito.junit
-}
+/**
+ * Message group for placement driver messages.
+ */
+@MessageGroup(groupType = GROUP_TYPE, groupName = GROUP_NAME)
+public interface PlacementDriverMessageGroup {
+    /** Placement driver message group type. */
+    short GROUP_TYPE = 11;
+
+    String GROUP_NAME = "PlacementDriverMessages";
 
-description = 'ignite-replicator'
+    short LEASE_GRANTED_MESSAGE = 0;
+
+    short LEASE_GRANTED_MESSAGE_RESPONSE = 1;
+}
diff --git a/modules/replicator/build.gradle b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
similarity index 55%
copy from modules/replicator/build.gradle
copy to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
index 34b50e0c45..9f26f228a8 100644
--- a/modules/replicator/build.gradle
+++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverReplicaMessage.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-apply from: "$rootDir/buildscripts/java-core.gradle"
-apply from: "$rootDir/buildscripts/publishing.gradle"
-apply from: "$rootDir/buildscripts/java-junit5.gradle"
+package org.apache.ignite.internal.placementdriver.message;
 
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
 
-dependencies {
-    annotationProcessor project(":ignite-network-annotation-processor")
-    implementation project(':ignite-core')
-    implementation project(':ignite-raft-api')
-    implementation project(':ignite-configuration-api')
-    implementation project(':ignite-network-api')
-    implementation libs.jetbrains.annotations
-    implementation libs.fastutil.core
-
-    testImplementation libs.mockito.core
-    testImplementation libs.mockito.junit
+/**
+ * Placement driver replica messages.
+ */
+public interface PlacementDriverReplicaMessage extends NetworkMessage {
+    /**
+     * Gets a replication group id.
+     *
+     * @return Replication group id.
+     */
+    @Marshallable
+    ReplicationGroupId groupId();
 }
-
-description = 'ignite-replicator'
diff --git a/modules/placement-driver/build.gradle b/modules/placement-driver/build.gradle
index c2bb5c894b..02bf76e2ef 100644
--- a/modules/placement-driver/build.gradle
+++ b/modules/placement-driver/build.gradle
@@ -34,6 +34,7 @@ dependencies {
     implementation project(':ignite-affinity')
     implementation project(':ignite-vault')
     implementation project(':ignite-rocksdb-common:')
+    implementation project(':ignite-replicator')
 
     implementation libs.jetbrains.annotations
 
@@ -48,6 +49,7 @@ dependencies {
     integrationTestImplementation project(':ignite-schema')
     integrationTestImplementation project(':ignite-table')
     integrationTestImplementation project(':ignite-affinity')
+    integrationTestImplementation project(':ignite-replicator')
 
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-network')))
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 8cc78bb9e5..ba5c983058 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -19,22 +19,69 @@ package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.placementdriver.ActiveActorTest.TestReplicationGroup.GROUP_ID;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.file.Path;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.LeaderChangeNotification;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -44,9 +91,21 @@ import org.mockito.quality.Strictness;
 /**
  * Placement driver active actor test.
  */
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
-public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
+public class ActiveActorTest extends IgniteAbstractTest {
+    /** RAFT message factory. */
+    private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+
+    /** Base node port. */
+    private static final int PORT_BASE = 1234;
+
+    @InjectConfiguration
+    protected RaftConfiguration raftConfiguration;
+
+    /** RPC executor. */
+    protected ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
+
     private Map<String, PlacementDriverManager> placementDriverManagers = new HashMap<>();
 
     @Mock
@@ -57,36 +116,54 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
 
     @AfterEach
     @Override
-    protected void afterTest() throws Exception {
+    public void tearDown(TestInfo testInfo) throws Exception {
         List<AutoCloseable> closeables = placementDriverManagers.values().stream().map(p -> (AutoCloseable) p::stop).collect(toList());
 
         closeAll(closeables);
 
         placementDriverManagers.clear();
 
-        super.afterTest();
+        super.tearDown(testInfo);
     }
 
-    /** {@inheritDoc} */
-    @Override
+    /**
+     * The method is called after every node of the cluster starts.
+     *
+     * @param nodeName Node name.
+     * @param clusterService Cluster service.
+     * @param dataPath Data path for raft node.
+     * @param placementDriverNodesNames Names of all nodes in raft group.
+     * @param eventsClientListener Raft events listener for client.
+     */
     protected void afterNodeStart(
             String nodeName,
             ClusterService clusterService,
+            Path dataPath,
             Set<String> placementDriverNodesNames,
             RaftGroupEventsClientListener eventsClientListener
     ) {
+        var raftManager = new Loza(clusterService, raftConfiguration, dataPath, new HybridClockImpl());
+
+        LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);
+
+        var raftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                clusterService,
+                logicalTopologyService,
+                Loza.FACTORY,
+                eventsClientListener
+        );
+
         PlacementDriverManager placementDriverManager = new PlacementDriverManager(
                 msm,
                 new VaultManager(new InMemoryVaultService()),
-                TestReplicationGroup.GROUP_ID,
+                GROUP_ID,
                 clusterService,
-                raftConfiguration,
                 () -> completedFuture(placementDriverNodesNames),
-                new LogicalTopologyServiceTestImpl(clusterService),
-                executor,
+                logicalTopologyService,
+                raftManager,
+                raftGroupServiceFactory,
                 tblsCfg,
-                new HybridClockImpl(),
-                eventsClientListener
+                new HybridClockImpl()
         );
 
         placementDriverManager.start();
@@ -95,27 +172,14 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
     }
 
     /**
-     * The method is called after every node of the cluster starts.
+     * The method is called after every node of the cluster stops.
      *
      * @param nodeName Node name.
      */
-    @Override
-    protected void afterNodeStop(String nodeName) {
+    private void afterNodeStop(String nodeName) {
         placementDriverManagers.remove(nodeName);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    protected boolean afterInitCheckCondition(String leaderName) {
-        return checkSingleActiveActor(leaderName);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    protected boolean afterLeaderChangeCheckCondition(String leaderName) {
-        return checkSingleActiveActor(leaderName);
-    }
-
     private boolean checkSingleActiveActor(String leaderName) {
         for (Map.Entry<String, PlacementDriverManager> e : placementDriverManagers.entrySet()) {
             if (e.getValue().isActiveActor() != e.getKey().equals(leaderName)) {
@@ -125,4 +189,476 @@ public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
 
         return true;
     }
+
+    @Test
+    public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception {
+        var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+        var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+        int nodes = 2;
+
+        TopologyAwareRaftGroupService raftClient = startCluster(
+                testInfo,
+                clusterServices,
+                raftServers,
+                addr -> true,
+                nodes,
+                PORT_BASE + 1
+        );
+
+        CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>();
+
+        raftClient.subscribeLeader((node, term) -> leaderFut.complete(node));
+
+        ClusterNode leader = leaderFut.get(10, TimeUnit.SECONDS);
+
+        assertNotNull(leader);
+
+        afterInitCheckConditionWithWait(leader.name());
+
+        stopCluster(clusterServices, raftServers, List.of(raftClient), nodes);
+    }
+
+    @Test
+    public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws Exception {
+        var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+        var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+        int nodes = 3;
+        Predicate<NetworkAddress> isServerAddress = addr -> true;
+
+        TopologyAwareRaftGroupService raftClient = startCluster(
+                testInfo,
+                clusterServices,
+                raftServers,
+                isServerAddress,
+                nodes,
+                PORT_BASE
+        );
+
+        raftClient.refreshLeader().get();
+
+        var clientClusterService = clusterService(testInfo, PORT_BASE + nodes + 1, new StaticNodeFinder(getNetworkAddresses(nodes)));
+        clientClusterService.start();
+
+        TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient(
+                clientClusterService,
+                clusterServices,
+                isServerAddress,
+                nodes,
+                null,
+                false
+        );
+
+        AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+        AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
+        AtomicInteger callsCount = new AtomicInteger();
+
+        raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+        for (int i = 0; i < 2; i++) {
+            raftClientNoInitialNotify.unsubscribeLeader();
+
+            raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+                callsCount.incrementAndGet();
+                leaderRefNoInitialNotify.set(node);
+            });
+        }
+
+        assertTrue(callsCount.get() <= 1);
+
+        assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
+
+        ClusterNode leader = leaderRef.get();
+
+        assertNotNull(leader);
+
+        log.info("Leader: " + leader);
+
+        afterInitCheckConditionWithWait(leader.name());
+
+        var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port()));
+        raftServiceToStop.stopRaftNodes(GROUP_ID);
+        raftServiceToStop.stop();
+
+        afterNodeStop(leader.name());
+
+        clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stop();
+
+        assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000));
+        assertTrue(waitForCondition(() -> !leader.equals(leaderRefNoInitialNotify.get()), 1000));
+
+        log.info("New Leader: " + leaderRef.get());
+
+        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
+        raftClientNoInitialNotify.refreshLeader().get();
+
+        assertEquals(raftClientNoInitialNotify.leader().consistentId(), leaderRef.get().name());
+
+        stopCluster(clusterServices, raftServers, List.of(raftClient, raftClientNoInitialNotify), nodes);
+
+        clientClusterService.stop();
+    }
+
+    @Test
+    public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
+        var clusterServices = new HashMap<NetworkAddress, ClusterService>();
+        var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+        int nodes = 3;
+        Predicate<NetworkAddress> isServerAddress = addr -> true;
+
+        TopologyAwareRaftGroupService raftClient = startCluster(
+                testInfo,
+                clusterServices,
+                raftServers,
+                isServerAddress,
+                nodes,
+                PORT_BASE
+        );
+
+        raftClient.refreshLeader().get();
+
+        var clientClusterService = clusterService(testInfo, PORT_BASE + nodes + 1, new StaticNodeFinder(getNetworkAddresses(nodes)));
+        clientClusterService.start();
+
+        TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient(
+                clientClusterService,
+                clusterServices,
+                isServerAddress,
+                nodes,
+                null,
+                false
+        );
+
+        AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+        AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>();
+        AtomicInteger callsCount = new AtomicInteger();
+
+        raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+        for (int i = 0; i < 2; i++) {
+            raftClientNoInitialNotify.unsubscribeLeader();
+
+            raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+                callsCount.incrementAndGet();
+                leaderRefNoInitialNotify.set(node);
+            });
+        }
+
+        assertTrue(callsCount.get() <= 1);
+
+        assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
+
+        ClusterNode leader = leaderRef.get();
+
+        assertNotNull(leader);
+
+        log.info("Leader: " + leader);
+
+        afterInitCheckConditionWithWait(leader.name());
+
+        Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get();
+
+        log.info("Peer to transfer leader: " + newLeaderPeer);
+
+        raftClient.transferLeadership(newLeaderPeer).get();
+
+        String leaderId = newLeaderPeer.consistentId();
+
+        assertTrue(waitForCondition(() -> leaderId.equals(leaderRef.get().name()), 10_000));
+        assertTrue(waitForCondition(
+                () -> leaderRefNoInitialNotify.get() != null && leaderId.equals(leaderRefNoInitialNotify.get().name()), 1000)
+        );
+
+        log.info("New Leader: " + leaderRef.get());
+
+        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
+        raftClient.refreshLeader().get();
+
+        assertEquals(raftClient.leader().consistentId(), leaderRef.get().name());
+
+        stopCluster(clusterServices, raftServers, List.of(raftClient, raftClientNoInitialNotify), nodes);
+        clientClusterService.stop();
+    }
+
+    /**
+     * Stops cluster.
+     *
+     * @param clusterServices Cluster services.
+     * @param raftServers     RAFT services.
+     * @param raftClients     RAFT clients.
+     * @param nodes           Node count.
+     * @throws Exception If failed.
+     */
+    private void stopCluster(
+            HashMap<NetworkAddress, ClusterService> clusterServices,
+            HashMap<NetworkAddress, JraftServerImpl> raftServers,
+            List<TopologyAwareRaftGroupService> raftClients,
+            int nodes
+    ) throws Exception {
+        if (raftClients != null) {
+            raftClients.forEach(client -> client.shutdown());
+        }
+
+        for (NetworkAddress addr : getNetworkAddresses(nodes)) {
+            if (raftServers.containsKey(addr)) {
+                raftServers.get(addr).stopRaftNodes(GROUP_ID);
+
+                raftServers.get(addr).stop();
+            }
+
+            if (clusterServices.containsKey(addr)) {
+                clusterServices.get(addr).stop();
+            }
+        }
+    }
+
+    /**
+     * Starts cluster.
+     *
+     * @param testInfo        Test info.
+     * @param clusterServices Cluster services.
+     * @param raftServers     RAFT services.
+     * @param isServerAddress Closure to determine a server node.
+     * @param nodes           Node count.
+     * @param clientPort      Port of node where a client will start.
+     * @return Topology aware client.
+     */
+    private TopologyAwareRaftGroupService startCluster(
+            TestInfo testInfo,
+            HashMap<NetworkAddress, ClusterService> clusterServices,
+            HashMap<NetworkAddress, JraftServerImpl> raftServers,
+            Predicate<NetworkAddress> isServerAddress,
+            int nodes,
+            int clientPort
+    ) {
+        List<NetworkAddress> addresses = getNetworkAddresses(nodes);
+
+        var nodeFinder = new StaticNodeFinder(addresses);
+
+        TopologyAwareRaftGroupService raftClient = null;
+
+        for (NetworkAddress addr : addresses) {
+            var cluster = clusterService(testInfo, addr.port(), nodeFinder);
+
+            cluster.start();
+
+            clusterServices.put(addr, cluster);
+        }
+
+        PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, isServerAddress, nodes);
+
+        Set<String> placementDriverNodesNames = peersAndLearners.peers().stream().map(Peer::consistentId).collect(toSet());
+
+        for (NetworkAddress addr : addresses) {
+            var cluster = clusterServices.get(addr);
+
+            RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
+
+            if (isServerAddress.test(addr)) { //RAFT server node
+                var localPeer = peersAndLearners.peers().stream()
+                        .filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
+
+                var dataPath = workDir.resolve("raft_" + localPeer.consistentId());
+
+                var raftServer = new JraftServerImpl(
+                        cluster,
+                        dataPath,
+                        new NodeOptions(),
+                        eventsClientListener
+                );
+                raftServer.start();
+
+                raftServer.startRaftNode(
+                        new RaftNodeId(GROUP_ID, localPeer),
+                        peersAndLearners,
+                        new TestRaftGroupListener(),
+                        RaftGroupOptions.defaults()
+                );
+
+                raftServers.put(addr, raftServer);
+
+                afterNodeStart(localPeer.consistentId(), cluster, dataPath, placementDriverNodesNames, eventsClientListener);
+            }
+
+            if (addr.port() == clientPort) {
+                assertTrue(isServerAddress.test(addr));
+
+                raftClient = startTopologyAwareClient(cluster, clusterServices, isServerAddress, nodes, eventsClientListener, true);
+            }
+        }
+
+        return raftClient;
+    }
+
+    private TopologyAwareRaftGroupService startTopologyAwareClient(
+            ClusterService localClusterService,
+            Map<NetworkAddress, ClusterService> clusterServices,
+            Predicate<NetworkAddress> isServerAddress,
+            int nodes,
+            RaftGroupEventsClientListener eventsClientListener,
+            boolean notifyOnSubscription
+    ) {
+        if (eventsClientListener == null) {
+            eventsClientListener = new RaftGroupEventsClientListener();
+
+            var finalEventsClientListener = eventsClientListener;
+            localClusterService.messagingService().addMessageHandler(RaftMessageGroup.class, (msg, sender, correlationId) -> {
+                if (msg instanceof LeaderChangeNotification) {
+                    LeaderChangeNotification msg0 = (LeaderChangeNotification) msg;
+
+                    ClusterNode node = localClusterService.topologyService().getByConsistentId(sender);
+                    finalEventsClientListener.onLeaderElected(msg0.groupId(), node, msg0.term());
+                }
+            });
+        }
+
+        return (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start(
+                GROUP_ID,
+                localClusterService,
+                FACTORY,
+                raftConfiguration,
+                peersAndLearners(clusterServices, isServerAddress, nodes),
+                true,
+                executor,
+                new LogicalTopologyServiceTestImpl(localClusterService),
+                eventsClientListener,
+                notifyOnSubscription
+        ).join();
+    }
+
+    private static PeersAndLearners peersAndLearners(
+            Map<NetworkAddress, ClusterService> clusterServices,
+            Predicate<NetworkAddress> isServerAddress,
+            int nodes
+    ) {
+        return PeersAndLearners.fromConsistentIds(
+                getNetworkAddresses(nodes).stream().filter(isServerAddress)
+                        .map(netAddr -> clusterServices.get(netAddr).topologyService().localMember().name()).collect(
+                                toSet()));
+    }
+
+    /**
+     * Generates a node address for each node.
+     *
+     * @param nodes Node count.
+     * @return List on network addresses.
+     */
+    private static List<NetworkAddress> getNetworkAddresses(int nodes) {
+        List<NetworkAddress> addresses = IntStream.range(PORT_BASE, PORT_BASE + nodes)
+                .mapToObj(port -> new NetworkAddress("localhost", port))
+                .collect(Collectors.toList());
+        return addresses;
+    }
+
+    /**
+     * Checks the condition after cluster and raft clients initialization, waiting for this condition.
+     *
+     * @param leaderName Current leader name.
+     * @throws InterruptedException If failed.
+     */
+    private void afterInitCheckConditionWithWait(String leaderName) throws InterruptedException {
+        assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 10_000));
+    }
+
+    /**
+     * Checks the condition after cluster and raft clients initialization.
+     *
+     * @param leaderName Current leader name.
+     * @return Condition result.
+     */
+    private boolean afterInitCheckCondition(String leaderName) {
+        return checkSingleActiveActor(leaderName);
+    }
+
+    /**
+     * Checks the condition after leader change, waiting for this condition.
+     *
+     * @param leaderName Current leader name.
+     * @throws InterruptedException If failed.
+     */
+    private void afterLeaderChangeCheckConditionWithWait(String leaderName) throws InterruptedException {
+        assertTrue(waitForCondition(() -> afterLeaderChangeCheckCondition(leaderName), 10_000));
+    }
+
+    /**
+     * Checks the condition after leader change.
+     *
+     * @param leaderName Current leader name.
+     * @return Condition result.
+     */
+    private boolean afterLeaderChangeCheckCondition(String leaderName) {
+        return checkSingleActiveActor(leaderName);
+    }
+
+    /**
+     * Replication test group class.
+     */
+    public enum TestReplicationGroup implements ReplicationGroupId {
+        /** Replication group id. */
+        GROUP_ID;
+
+        /** {@inheritDoc} */
+        @Override
+        public String toString() {
+            return "TestReplicationGroup";
+        }
+    }
+
+    private static class TestRaftGroupListener implements RaftGroupListener {
+        @Override
+        public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+            iterator.forEachRemaining(closure -> {
+                closure.result(null);
+            });
+        }
+
+        @Override
+        public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        }
+
+        @Override
+        public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+        }
+
+        @Override
+        public boolean onSnapshotLoad(Path path) {
+            return true;
+        }
+
+        @Override
+        public void onShutdown() {
+        }
+    }
+
+    /**
+     * Test implementation of {@link LogicalTopologyService}.
+     */
+    protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
+        private final ClusterService clusterService;
+
+        public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
+            this.clusterService = clusterService;
+        }
+
+        @Override
+        public void addEventListener(LogicalTopologyEventListener listener) {
+
+        }
+
+        @Override
+        public void removeEventListener(LogicalTopologyEventListener listener) {
+
+        }
+
+        @Override
+        public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+            return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+        }
+
+        @Override
+        public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
+            return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+        }
+    }
 }
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index b929784784..eb8c04df9a 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -21,8 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
 import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
@@ -39,8 +37,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -61,12 +57,12 @@ import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -108,8 +104,6 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
 
     private PlacementDriverManager placementDriverManager;
 
-    private ScheduledExecutorService raftExecutorService;
-
     private TestInfo testInfo;
 
     @BeforeEach
@@ -132,19 +126,22 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
         when(cmgManager.metaStorageNodes())
                 .thenReturn(completedFuture(Set.of(clusterService.localConfiguration().getName())));
 
-        raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
-                new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterService.localConfiguration().getName(),
-                        CLIENT_POOL_NAME), log
-                ));
-
         RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
 
+        LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);
+
+        TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                clusterService,
+                logicalTopologyService,
+                Loza.FACTORY,
+                eventsClientListener
+        );
+
         raftManager = new Loza(
                 clusterService,
                 raftConfiguration,
                 workDir.resolve("loza"),
                 new HybridClockImpl(),
-                raftExecutorService,
                 eventsClientListener
         );
 
@@ -154,7 +151,7 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
                 vaultManager,
                 clusterService,
                 cmgManager,
-                mock(LogicalTopologyService.class),
+                logicalTopologyService,
                 raftManager,
                 storage
         );
@@ -164,17 +161,16 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
                 vaultManager,
                 MetastorageGroupId.INSTANCE,
                 clusterService,
-                raftConfiguration,
                 () -> completedFuture(peersAndLearners(
                         new HashMap<>(Map.of(new NetworkAddress("localhost", PORT), clusterService)),
                         addr -> true,
                         1)
                         .peers().stream().map(Peer::consistentId).collect(toSet())),
-                new LogicalTopologyServiceTestImpl(clusterService),
-                raftExecutorService,
+                logicalTopologyService,
+                raftManager,
+                topologyAwareRaftGroupServiceFactory,
                 tblsCfg,
-                clock,
-                eventsClientListener
+                clock
         );
 
         vaultManager.start();
@@ -203,7 +199,6 @@ public class PlacementDriverManagerTest extends IgniteAbstractTest {
         raftManager.stop();
         clusterService.stop();
         vaultManager.stop();
-        raftExecutorService.shutdown();
     }
 
     private static PeersAndLearners peersAndLearners(
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index 051340efd9..ea1f96402d 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -49,12 +49,6 @@ public class LeaseUpdater {
     /** Ignite logger. */
     private static final IgniteLogger LOG = Loggers.forClass(LeaseUpdater.class);
 
-    /**
-     * Cluster cLock skew. The constant determines the undefined inclusive interval to compares timestamp from various nodes.
-     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
-     */
-    private static final long CLOCK_SKEW = 7L;
-
     /** Update attempts interval in milliseconds. */
     private static final long UPDATE_LEASE_MS = 200L;
 
@@ -152,23 +146,6 @@ public class LeaseUpdater {
         }
     }
 
-    /**
-     * Compares two timestamps with the clock skew.
-     * t1, t2 comparable if t1 is not contained on [t2 - CLOCK_SKEW; t2 + CLOCK_SKEW].
-     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
-     *
-     * @param ts1 First timestamp.
-     * @param ts2 Second timestamp.
-     * @return Result of comparison can be positive or negative, or {@code 0} if timestamps are not comparable.
-     */
-    private static int compareWithClockSkew(HybridTimestamp ts1, HybridTimestamp ts2) {
-        if (ts1.getPhysical() - CLOCK_SKEW <= ts2.getPhysical() && ts1.getPhysical() + CLOCK_SKEW >= ts2.getPhysical()) {
-            return 0;
-        }
-
-        return ts1.compareTo(ts2);
-    }
-
     /**
      * Finds a node that can be the leaseholder.
      *
@@ -263,7 +240,7 @@ public class LeaseUpdater {
             HybridTimestamp now = clock.now();
 
             return lease == EMPTY_LEASE
-                    || (!candidate.equals(lease.getLeaseholder()) && compareWithClockSkew(now, lease.getLeaseExpirationTime()) > 0);
+                    || (!candidate.equals(lease.getLeaseholder()) && now.after(lease.getLeaseExpirationTime()));
         }
     }
 }
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 545a59fdb4..ea29a558d5 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -30,16 +30,16 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -52,8 +52,6 @@ public class PlacementDriverManager implements IgniteComponent {
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    private final RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
-
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
@@ -66,17 +64,14 @@ public class PlacementDriverManager implements IgniteComponent {
     /** The closure determines nodes where are participants of placement driver. */
     private final Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider;
 
-    /** Raft client future. Can contain null, if this node is not in placement driver group. */
-    private final CompletableFuture<TopologyAwareRaftGroupService> raftClientFuture;
-
-    /** Executor sends a raft requests and receives response. */
-    private final ScheduledExecutorService raftClientExecutor;
+    private final RaftManager raftManager;
 
-    /** Logical topology service. */
-    private final LogicalTopologyService logicalTopologyService;
+    private final TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory;
 
-    /** Raft configuration. */
-    private final RaftConfiguration raftConfiguration;
+    /**
+     * Raft client future. Can contain null, if this node is not in placement driver group.
+     */
+    private final CompletableFuture<TopologyAwareRaftGroupService> raftClientFuture;
 
     /** Lease tracker. */
     private final LeaseTracker leaseTracker;
@@ -84,9 +79,6 @@ public class PlacementDriverManager implements IgniteComponent {
     /** Lease updater. */
     private final LeaseUpdater leaseUpdater;
 
-    private final RaftGroupEventsClientListener raftGroupEventsClientListener;
-
-    /** The flag is true when the instance of placement driver renews leases, false when the instance only tracks leases. */
     private volatile boolean isActiveActor;
 
     /**
@@ -96,10 +88,10 @@ public class PlacementDriverManager implements IgniteComponent {
      * @param vaultManager Vault manager.
      * @param replicationGroupId Id of placement driver group.
      * @param clusterService Cluster service.
-     * @param raftConfiguration Raft configuration.
      * @param placementDriverNodesNamesProvider Provider of the set of placement driver nodes' names.
      * @param logicalTopologyService Logical topology service.
-     * @param raftClientExecutor Raft client executor.
+     * @param raftManager Raft manager.
+     * @param topologyAwareRaftGroupServiceFactory Raft client factory.
      * @param tablesCfg Table configuration.
      * @param clock Hybrid clock.
      */
@@ -108,21 +100,18 @@ public class PlacementDriverManager implements IgniteComponent {
             VaultManager vaultManager,
             ReplicationGroupId replicationGroupId,
             ClusterService clusterService,
-            RaftConfiguration raftConfiguration,
             Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider,
             LogicalTopologyService logicalTopologyService,
-            ScheduledExecutorService raftClientExecutor,
+            RaftManager raftManager,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
             TablesConfiguration tablesCfg,
-            HybridClock clock,
-            RaftGroupEventsClientListener raftGroupEventsClientListener
+            HybridClock clock
     ) {
         this.replicationGroupId = replicationGroupId;
         this.clusterService = clusterService;
-        this.raftConfiguration = raftConfiguration;
         this.placementDriverNodesNamesProvider = placementDriverNodesNamesProvider;
-        this.logicalTopologyService = logicalTopologyService;
-        this.raftClientExecutor = raftClientExecutor;
-        this.raftGroupEventsClientListener = raftGroupEventsClientListener;
+        this.raftManager = raftManager;
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
 
         this.raftClientFuture = new CompletableFuture<>();
         this.leaseTracker = new LeaseTracker(vaultManager, metaStorageMgr);
@@ -144,24 +133,17 @@ public class PlacementDriverManager implements IgniteComponent {
                     String thisNodeName = clusterService.topologyService().localMember().name();
 
                     if (placementDriverNodes.contains(thisNodeName)) {
-                        leaseUpdater.init(thisNodeName);
-
-                        return TopologyAwareRaftGroupService.start(
-                                replicationGroupId,
-                                clusterService,
-                                raftMessagesFactory,
-                                raftConfiguration,
-                                PeersAndLearners.fromConsistentIds(placementDriverNodes),
-                                true,
-                                raftClientExecutor,
-                                logicalTopologyService,
-                                raftGroupEventsClientListener,
-                                true
-                            ).thenCompose(client -> {
-                                TopologyAwareRaftGroupService topologyAwareClient = (TopologyAwareRaftGroupService) client;
-
-                                return topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v -> topologyAwareClient);
-                            });
+                        try {
+                            leaseUpdater.init(thisNodeName);
+
+                            return raftManager.startRaftGroupService(
+                                    replicationGroupId,
+                                    PeersAndLearners.fromConsistentIds(placementDriverNodes),
+                                    topologyAwareRaftGroupServiceFactory
+                                ).thenCompose(client -> client.subscribeLeader(this::onLeaderChange).thenApply(v -> client));
+                        } catch (NodeStoppingException e) {
+                            return failedFuture(e);
+                        }
                     } else {
                         return completedFuture(null);
                     }
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index 09446af8a5..789a327f31 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -44,6 +44,24 @@ public interface RaftManager extends IgniteComponent {
             RaftGroupEventsListener eventsLsnr
     ) throws NodeStoppingException;
 
+    /**
+     * Starts a Raft group and a Raft service on the current node, using the given raft group service.
+     *
+     * @param nodeId Raft node ID.
+     * @param configuration Peers and Learners of the Raft group.
+     * @param lsnr Raft group listener.
+     * @param eventsLsnr Raft group events listener.
+     * @param factory Service factory.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+            RaftNodeId nodeId,
+            PeersAndLearners configuration,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener eventsLsnr,
+            RaftServiceFactory<T> factory
+    ) throws NodeStoppingException;
+
     /**
      * Stops a given local Raft node.
      *
@@ -77,4 +95,19 @@ public interface RaftManager extends IgniteComponent {
             ReplicationGroupId groupId,
             PeersAndLearners configuration
     ) throws NodeStoppingException;
+
+    /**
+     * Creates a Raft group service providing operations on a Raft group, using the given factory.
+     *
+     * @param groupId Raft group ID.
+     * @param configuration Peers and Learners of the Raft group.
+     * @param factory Factory that should be used to create raft service.
+     * @return Future that will be completed with an instance of a Raft group service.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners configuration,
+            RaftServiceFactory<T> factory
+    ) throws NodeStoppingException;
 }
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
new file mode 100644
index 0000000000..ecea7e524d
--- /dev/null
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Factory that can be used to create customized Raft service.
+ */
+public interface RaftServiceFactory<T extends RaftGroupService> {
+    /**
+     * Creates Raft group service.
+     *
+     * @param groupId Group id.
+     * @param peersAndLearners Peers configuration.
+     * @param raftConfiguration Raft configuration.
+     * @param raftClientExecutor Client executor.
+     * @return Future that contains client when completes.
+     */
+    CompletableFuture<T> startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners peersAndLearners,
+            RaftConfiguration raftConfiguration,
+            ScheduledExecutorService raftClientExecutor
+    );
+}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index e86f83e994..d731fc75e5 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -233,6 +233,13 @@ public interface RaftGroupService {
      */
     void shutdown();
 
+    /**
+     * Reads index from the group leader.
+     *
+     * @return Future containing the index.
+     */
+    CompletableFuture<Long> readIndex();
+
     /**
      * Returns a cluster service.
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 455ae5d49b..b452883af3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -46,6 +48,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -55,13 +58,13 @@ import org.jetbrains.annotations.TestOnly;
 //  see https://issues.apache.org/jira/browse/IGNITE-18273
 public class Loza implements RaftManager {
     /** Factory. */
-    private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+    public static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
 
     /** Raft client pool name. */
     public static final String CLIENT_POOL_NAME = "Raft-Group-Client";
 
     /** Raft client pool size. Size was taken from jraft's TimeManager. */
-    public static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
+    private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(Loza.class);
@@ -93,14 +96,12 @@ public class Loza implements RaftManager {
      * @param raftConfiguration Raft configuration.
      * @param dataPath Data path.
      * @param clock A hybrid logical clock.
-     * @param executor Executor for raft group services.
      */
     public Loza(
             ClusterService clusterNetSvc,
             RaftConfiguration raftConfiguration,
             Path dataPath,
             HybridClock clock,
-            ScheduledExecutorService executor,
             RaftGroupEventsClientListener raftGroupEventsClientListener
     ) {
         this.clusterNetSvc = clusterNetSvc;
@@ -114,7 +115,11 @@ public class Loza implements RaftManager {
 
         this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options, raftGroupEventsClientListener);
 
-        this.executor = executor;
+        this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+                new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
+                        CLIENT_POOL_NAME), LOG
+                )
+        );
     }
 
     /**
@@ -136,11 +141,6 @@ public class Loza implements RaftManager {
                 raftConfiguration,
                 dataPath,
                 clock,
-                new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
-                        new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
-                                CLIENT_POOL_NAME), LOG
-                        )
-                ),
                 new RaftGroupEventsClientListener()
         );
     }
@@ -162,6 +162,8 @@ public class Loza implements RaftManager {
 
         busyLock.block();
 
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+
         raftServer.stop();
     }
 
@@ -175,6 +177,17 @@ public class Loza implements RaftManager {
         return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
     }
 
+    @Override
+    public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+            RaftNodeId nodeId,
+            PeersAndLearners configuration,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener eventsLsnr,
+            RaftServiceFactory<T> factory
+    ) throws NodeStoppingException {
+        return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults(), factory);
+    }
+
     /**
      * Starts a Raft group on the current node.
      *
@@ -191,13 +204,35 @@ public class Loza implements RaftManager {
             RaftGroupListener lsnr,
             RaftGroupEventsListener eventsLsnr,
             RaftGroupOptions groupOptions
+    ) throws NodeStoppingException {
+        return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, groupOptions, null);
+    }
+
+    /**
+     * Starts a Raft group on the current node.
+     *
+     * @param nodeId Raft node ID.
+     * @param configuration Peers and Learners of the Raft group.
+     * @param lsnr Raft group listener.
+     * @param eventsLsnr Raft group events listener.
+     * @param groupOptions Options to apply to the group.
+     * @param raftServiceFactory If not null, used for creation of raft group service.
+     * @throws NodeStoppingException If node stopping intention was detected.
+     */
+    public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNode(
+            RaftNodeId nodeId,
+            PeersAndLearners configuration,
+            RaftGroupListener lsnr,
+            RaftGroupEventsListener eventsLsnr,
+            RaftGroupOptions groupOptions,
+            @Nullable RaftServiceFactory<T> raftServiceFactory
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions);
+            return startRaftGroupNodeInternal(nodeId, configuration, lsnr, eventsLsnr, groupOptions, raftServiceFactory);
         } finally {
             busyLock.leaveBusy();
         }
@@ -219,12 +254,30 @@ public class Loza implements RaftManager {
         }
     }
 
-    private CompletableFuture<RaftGroupService> startRaftGroupNodeInternal(
+    @Override
+    public <T extends RaftGroupService> CompletableFuture<T> startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners configuration,
+            RaftServiceFactory<T> factory
+    ) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return factory.startRaftGroupService(groupId, configuration, raftConfiguration, executor);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private <T extends RaftGroupService> CompletableFuture<T> startRaftGroupNodeInternal(
             RaftNodeId nodeId,
             PeersAndLearners configuration,
             RaftGroupListener lsnr,
             RaftGroupEventsListener raftGrpEvtsLsnr,
-            RaftGroupOptions groupOptions
+            RaftGroupOptions groupOptions,
+            @Nullable RaftServiceFactory<T> raftServiceFactory
     ) {
         if (LOG.isInfoEnabled()) {
             LOG.info("Start new raft node={} with initial configuration={}", nodeId, configuration);
@@ -239,7 +292,9 @@ public class Loza implements RaftManager {
             ));
         }
 
-        return startRaftGroupServiceInternal(nodeId.groupId(), configuration);
+        return raftServiceFactory == null
+                ? (CompletableFuture<T>) startRaftGroupServiceInternal(nodeId.groupId(), configuration)
+                : raftServiceFactory.startRaftGroupService(nodeId.groupId(), configuration, raftConfiguration, executor);
     }
 
     private CompletableFuture<RaftGroupService> startRaftGroupServiceInternal(
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index b5156316fd..d89fe751bd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.ignite.raft.jraft.rpc.ActionRequest;
 import org.apache.ignite.raft.jraft.rpc.ActionResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
@@ -460,6 +461,18 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         busyLock.block();
     }
 
+    @Override
+    public CompletableFuture<Long> readIndex() {
+        Function<Peer, ? extends NetworkMessage> requestFactory = p -> factory.readIndexRequest()
+                .groupId(groupId)
+                .build();
+
+        Peer leader = leader();
+        Peer node = leader == null ? randomNode() : leader;
+        return this.<ReadIndexResponse>sendWithRetry(node, requestFactory)
+                .thenApply(ReadIndexResponse::index);
+    }
+
     @Override
     public ClusterService clusterService() {
         return cluster;
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index b2893a9455..c92067d7f0 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -21,8 +21,10 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toUnmodifiableList;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -82,6 +84,7 @@ import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
 import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -559,6 +562,28 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest {
         assertThat(fut.thenApply(GetLeaderResponse::currentTerm), willBe(equalTo(CURRENT_TERM)));
     }
 
+    @Test
+    public void testReadIndex() {
+        RaftGroupService service = startRaftGroupService(NODES, false);
+        mockReadIndex(false);
+
+        CompletableFuture<Long> fut = service.readIndex();
+
+        assertThat(fut, willSucceedFast());
+
+        assertEquals(1L, fut.join());
+    }
+
+    @Test
+    public void testReadIndexWithMessageSendTimeout() {
+        RaftGroupService service = startRaftGroupService(NODES, false);
+        mockReadIndex(true);
+
+        CompletableFuture<Long> fut = service.readIndex();
+
+        assertThat(fut, willThrowFast(TimeoutException.class));
+    }
+
     private RaftGroupService startRaftGroupService(List<Peer> peers, boolean getLeader) {
         PeersAndLearners memberConfiguration = PeersAndLearners.fromPeers(peers, Set.of());
 
@@ -570,6 +595,17 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest {
         return service.join();
     }
 
+    /**
+     * Mock read index request.
+     */
+    private void mockReadIndex(boolean timeout) {
+        when(messagingService.invoke(any(ClusterNode.class), any(ReadIndexRequest.class), anyLong()))
+                .then(invocation -> timeout
+                        ? failedFuture(new TimeoutException())
+                        : completedFuture(FACTORY.readIndexResponse().index(1L).build())
+                );
+    }
+
     /**
      * Mocks sending {@link ActionRequest}s.
      *
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index 34b50e0c45..e21bb7d4b1 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -18,17 +18,29 @@
 apply from: "$rootDir/buildscripts/java-core.gradle"
 apply from: "$rootDir/buildscripts/publishing.gradle"
 apply from: "$rootDir/buildscripts/java-junit5.gradle"
-
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
 
 dependencies {
     annotationProcessor project(":ignite-network-annotation-processor")
     implementation project(':ignite-core')
+    implementation project(':ignite-raft')
     implementation project(':ignite-raft-api')
+    implementation project(':ignite-configuration')
     implementation project(':ignite-configuration-api')
+    implementation project(':ignite-cluster-management')
     implementation project(':ignite-network-api')
+    implementation project(':ignite-placement-driver-api')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
 
+    integrationTestImplementation project(':ignite-raft')
+    integrationTestImplementation project(':ignite-raft-api')
+
+    testImplementation(testFixtures(project(':ignite-core')))
+    testImplementation(testFixtures(project(':ignite-network')))
+    testImplementation(testFixtures(project(':ignite-configuration:')))
+    testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
 }
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
similarity index 97%
rename from modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
rename to modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index b58507607a..b4f90d47aa 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -310,6 +310,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
      */
     public CompletableFuture<Void> unsubscribeLeader() {
         serverEventHandler.setOnLeaderElectedCallback(null);
+        serverEventHandler.resetLeader();
 
         var peers = peers();
         var futs = new CompletableFuture[peers.size()];
@@ -337,7 +338,9 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
 
     @Override
     public @Nullable Peer leader() {
-        return raftClient.leader();
+        Peer leader = serverEventHandler.leader();
+
+        return leader == null ? raftClient.leader() : leader;
     }
 
     @Override
@@ -422,6 +425,11 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
         raftClient.shutdown();
     }
 
+    @Override
+    public CompletableFuture<Long> readIndex() {
+        return raftClient.readIndex();
+    }
+
     @Override
     public ClusterService clusterService() {
         return raftClient.clusterService();
@@ -434,6 +442,9 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
         /** A term of last elected leader. */
         private long term = 0;
 
+        /** Last elected leader. */
+        private Peer leaderPeer;
+
         /** A leader elected callback. */
         private BiConsumer<ClusterNode, Long> onLeaderElectedCallback;
 
@@ -446,6 +457,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
         private synchronized void onLeaderElected(ClusterNode node, long term) {
             if (onLeaderElectedCallback != null && term > this.term) {
                 this.term = term;
+                this.leaderPeer = new Peer(node.name());
 
                 onLeaderElectedCallback.accept(node, term);
             }
@@ -473,5 +485,13 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
         public void accept(ClusterNode clusterNode, Long term) {
             onLeaderElected(clusterNode, term);
         }
+
+        Peer leader() {
+            return leaderPeer;
+        }
+
+        void resetLeader() {
+            leaderPeer = null;
+        }
     }
 }
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
new file mode 100644
index 0000000000..8bb79db8a9
--- /dev/null
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+
+/**
+ * Factory for creation {@link TopologyAwareRaftGroupService}.
+ */
+public class TopologyAwareRaftGroupServiceFactory implements RaftServiceFactory<TopologyAwareRaftGroupService> {
+    private final ClusterService clusterService;
+
+    private final LogicalTopologyService logicalTopologyService;
+
+    private final RaftMessagesFactory raftMessagesFactory;
+
+    private final RaftGroupEventsClientListener eventsClientListener;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param logicalTopologyService Logical topology service.
+     * @param raftMessagesFactory Raft messages factory.
+     * @param eventsClientListener Raft events client listener.
+     */
+    public TopologyAwareRaftGroupServiceFactory(
+            ClusterService clusterService,
+            LogicalTopologyService logicalTopologyService,
+            RaftMessagesFactory raftMessagesFactory,
+            RaftGroupEventsClientListener eventsClientListener
+    ) {
+        this.clusterService = clusterService;
+        this.logicalTopologyService = logicalTopologyService;
+        this.raftMessagesFactory = raftMessagesFactory;
+        this.eventsClientListener = eventsClientListener;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<TopologyAwareRaftGroupService> startRaftGroupService(
+            ReplicationGroupId groupId,
+            PeersAndLearners peersAndLearners,
+            RaftConfiguration raftConfiguration,
+            ScheduledExecutorService raftClientExecutor
+    ) {
+        return TopologyAwareRaftGroupService.start(
+                    groupId,
+                    clusterService,
+                    raftMessagesFactory,
+                    raftConfiguration,
+                    peersAndLearners,
+                    true,
+                    raftClientExecutor,
+                    logicalTopologyService,
+                    eventsClientListener,
+                    true
+            ).thenApply(TopologyAwareRaftGroupService.class::cast);
+    }
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index e646c3760e..df11a77ba3 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -17,33 +17,87 @@
 
 package org.apache.ignite.internal.replicator;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;
+
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkMessage;
 
 /**
  * Replica server.
  */
 public class Replica {
+    /** The logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
+
+    private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+
     /** Replica group identity, this id is the same as the considered partition's id. */
     private final ReplicationGroupId replicaGrpId;
 
     /** Replica listener. */
     private final ReplicaListener listener;
 
+    /** Storage index tracker. */
+    private final PendingComparableValuesTracker<Long> storageIndexTracker;
+
+    /** Topology aware Raft client. */
+    private final TopologyAwareRaftGroupService raftClient;
+
+    /** Instance of the local node. */
+    private final ClusterNode localNode;
+
+    // TODO IGNITE-18960 after replica inoperability logic is introduced, this future should be replaced with something like
+    //     VersionedValue (so that PlacementDriverMessages would wait for new leader election)
+    private CompletableFuture<AtomicReference<ClusterNode>> leaderFuture = new CompletableFuture<>();
+
+    private AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+
+    /** Latest lease expiration time. */
+    private volatile HybridTimestamp leaseExpirationTime = null;
+
     /**
      * The constructor of a replica server.
      *
      * @param replicaGrpId Replication group id.
      * @param listener Replica listener.
+     * @param storageIndexTracker Storage index tracker.
+     * @param raftClient Topology aware Raft client.
+     * @param localNode Instance of the local node.
      */
     public Replica(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener
+            ReplicaListener listener,
+            PendingComparableValuesTracker<Long> storageIndexTracker,
+            TopologyAwareRaftGroupService raftClient,
+            ClusterNode localNode
     ) {
         this.replicaGrpId = replicaGrpId;
         this.listener = listener;
+        this.storageIndexTracker = storageIndexTracker;
+        this.raftClient = raftClient;
+        this.localNode = localNode;
+
+        raftClient.subscribeLeader(this::onLeaderElected);
     }
 
     /**
@@ -69,4 +123,122 @@ public class Replica {
     public ReplicationGroupId groupId() {
         return replicaGrpId;
     }
+
+    private void onLeaderElected(ClusterNode clusterNode, Long term) {
+        leaderRef.set(clusterNode);
+
+        if (!leaderFuture.isDone()) {
+            leaderFuture.complete(leaderRef);
+        }
+    }
+
+    private CompletableFuture<ClusterNode> leaderFuture() {
+        return leaderFuture.thenApply(AtomicReference::get);
+    }
+
+    /**
+     * Process placement driver message.
+     *
+     * @param msg Message to process.
+     * @return Future that contains a result.
+     */
+    public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
+        if (msg instanceof LeaseGrantedMessage) {
+            return processLeaseGrantedMessage((LeaseGrantedMessage) msg);
+        }
+
+        return failedFuture(new AssertionError("Unknown message type, msg=" + msg));
+    }
+
+    /**
+     * Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance,
+     * initiates the leadership transfer, if this replica is not a group leader.
+     *
+     * @param msg Message to process.
+     * @return Future that contains a result.
+     */
+    public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+        LOG.info("Received LeaseGrantedMessage for replica belonging to group=" + groupId() + ", force=" + msg.force());
+
+        return leaderFuture().thenCompose(leader -> {
+            HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
+
+            if (leaseExpirationTime != null) {
+                assert msg.leaseExpirationTime().after(leaseExpirationTime) : "Invalid lease expiration time in message, msg=" + msg;
+            }
+
+            if (msg.force()) {
+                // Replica must wait till storage index reaches the current leader's index to make sure that all updates made on the
+                // group leader are received.
+
+                return waitForActualState(msg.leaseExpirationTime().getPhysical())
+                        .thenCompose(v -> {
+                            CompletableFuture<LeaseGrantedMessageResponse> respFut =
+                                    acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime());
+
+                            if (leader.equals(localNode)) {
+                                return respFut;
+                            } else {
+                                return raftClient.transferLeadership(new Peer(localNode.name()))
+                                        .thenCompose(ignored -> respFut);
+                            }
+                        });
+            } else {
+                if (leader.equals(localNode)) {
+                    return waitForActualState(msg.leaseExpirationTime().getPhysical())
+                            .thenCompose(v -> acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
+                } else {
+                    return proposeLeaseRedirect(leader);
+                }
+            }
+        });
+    }
+
+    private CompletableFuture<LeaseGrantedMessageResponse> acceptLease(
+            HybridTimestamp leaseStartTime,
+            HybridTimestamp leaseExpirationTime
+    ) {
+        LOG.info("Lease accepted, group=" + groupId() + ", leaseStartTime=" + leaseStartTime + ", leaseExpirationTime="
+                + leaseExpirationTime);
+
+        this.leaseExpirationTime = leaseExpirationTime;
+
+        LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
+                .accepted(true)
+                .build();
+
+        return completedFuture(resp);
+    }
+
+    private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(ClusterNode groupLeader) {
+        LOG.info("Proposing lease redirection, proposed node=" + groupLeader);
+
+        LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
+                .accepted(false)
+                .redirectProposal(groupLeader.name())
+                .build();
+
+        return completedFuture(resp);
+    }
+
+    /**
+     * Tries to read index from group leader and wait for this index to appear in local storage. Can possible return failed future with
+     * timeout exception, and in this case, replica would not answer to placement driver, because the response is useless. Placement driver
+     * should handle this.
+     *
+     * @param expirationTime Lease expiration time.
+     * @return Future that is completed when local storage catches up the index that is actual for leader on the moment of request.
+     */
+    private CompletableFuture<Void> waitForActualState(long expirationTime) {
+        LOG.info("Waiting for actual storage state, group=" + groupId());
+
+        long timeout = expirationTime - currentTimeMillis();
+        if (timeout <= 0) {
+            return failedFuture(new TimeoutException());
+        }
+
+        return retryOperationUntilSuccess(raftClient::readIndex, e -> currentTimeMillis() > expirationTime, Runnable::run)
+                .orTimeout(timeout, TimeUnit.MILLISECONDS)
+                .thenCompose(storageIndexTracker::waitFor);
+    }
 }
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index bcbaff0680..9a72bcea3a 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.replicator;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.util.Set;
@@ -31,6 +32,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
 import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -43,8 +47,10 @@ import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
@@ -79,6 +85,9 @@ public class ReplicaManager implements IgniteComponent {
     /** Replica message handler. */
     private final NetworkMessageHandler handler;
 
+    /** Message handler for placement driver messages. */
+    private final NetworkMessageHandler placementDriverMessageHandler;
+
     /** Replicas. */
     private final ConcurrentHashMap<ReplicationGroupId, CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
 
@@ -93,7 +102,7 @@ public class ReplicaManager implements IgniteComponent {
     private final Set<Class<?>> messageGroupsToHandle;
 
     /**
-     * Constructor for a    replica service.
+     * Constructor for a replica service.
      *
      * @param clusterNetSvc Cluster network service.
      * @param clock A hybrid logical clock.
@@ -106,80 +115,111 @@ public class ReplicaManager implements IgniteComponent {
         this.clusterNetSvc = clusterNetSvc;
         this.clock = clock;
         this.messageGroupsToHandle = messageGroupsToHandle;
-        this.handler = (message, senderConsistentId, correlationId) -> {
-            if (!busyLock.enterBusy()) {
-                throw new IgniteException(new NodeStoppingException());
+        this.handler = this::onReplicaMessageReceived;
+        this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived;
+    }
+
+    private void onReplicaMessageReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            if (!(message instanceof ReplicaRequest)) {
+                return;
             }
 
-            try {
-                if (!(message instanceof ReplicaRequest)) {
-                    return;
-                }
+            ReplicaRequest request = (ReplicaRequest) message;
 
-                ReplicaRequest request = (ReplicaRequest) message;
+            // Notify the sender that the Replica is created and ready to process requests.
+            if (request instanceof AwaitReplicaRequest) {
+                replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> {
+                    if (replicaFut == null) {
+                        replicaFut = new CompletableFuture<>();
+                    }
 
-                // Notify the sender that the Replica is created and ready to process requests.
-                if (request instanceof AwaitReplicaRequest) {
-                    replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> {
-                        if (replicaFut == null) {
-                            replicaFut = new CompletableFuture<>();
-                        }
+                    if (!replicaFut.isDone()) {
+                        replicaFut.thenCompose(
+                                ignore -> {
+                                    IgniteUtils.inBusyLock(
+                                            busyLock,
+                                            () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
+                                    );
 
-                        if (!replicaFut.isDone()) {
-                            replicaFut.thenCompose(
-                                    ignore -> {
-                                        IgniteUtils.inBusyLock(
-                                                busyLock,
-                                                () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)
-                                        );
+                                    return null;
+                                }
+                        );
 
-                                        return null;
-                                    }
-                            );
+                        return replicaFut;
+                    } else {
+                        IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
 
-                            return replicaFut;
-                        } else {
-                            IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId));
+                        return replicaFut;
+                    }
+                });
 
-                            return replicaFut;
-                        }
-                    });
+                return;
+            }
 
-                    return;
-                }
+            CompletableFuture<Replica> replicaFut = replicas.get(request.groupId());
+
+            HybridTimestamp requestTimestamp = extractTimestamp(request);
+
+            if (replicaFut == null || !replicaFut.isDone()) {
+                sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request.groupId(), requestTimestamp);
+
+                return;
+            }
 
-                CompletableFuture<Replica> replicaFut = replicas.get(request.groupId());
+            // replicaFut is always completed here.
+            CompletableFuture<?> result = replicaFut.join().processRequest(request);
 
-                HybridTimestamp requestTimestamp = extractTimestamp(request);
+            result.handle((res, ex) -> {
+                NetworkMessage msg;
 
-                if (replicaFut == null || !replicaFut.isDone()) {
-                    sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request, requestTimestamp);
+                if (ex == null) {
+                    msg = prepareReplicaResponse(requestTimestamp, res);
+                } else {
+                    LOG.warn("Failed to process replica request [request={}]", ex, request);
 
-                    return;
+                    msg = prepareReplicaErrorResponse(requestTimestamp, ex);
                 }
 
-                // replicaFut is always completed here.
-                CompletableFuture<?> result = replicaFut.join().processRequest(request);
+                clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
 
-                result.handle((res, ex) -> {
-                    NetworkMessage msg;
+                return null;
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 
-                    if (ex == null) {
-                        msg = prepareReplicaResponse(requestTimestamp, res);
-                    } else {
-                        LOG.warn("Failed to process replica request [request={}]", ex, request);
+    private void onPlacementDriverMessageReceived(NetworkMessage msg0, String senderConsistentId, @Nullable Long correlationId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
 
-                        msg = prepareReplicaErrorResponse(requestTimestamp, ex);
-                    }
+        try {
+            assert msg0 instanceof PlacementDriverReplicaMessage : "Unexpected message type, msg=" + msg0;
 
-                    clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId);
+            PlacementDriverReplicaMessage msg = (PlacementDriverReplicaMessage) msg0;
 
-                    return null;
-                });
-            } finally {
-                busyLock.leaveBusy();
-            }
-        };
+            CompletableFuture<Replica> replicaFut = replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
+
+            replicaFut
+                    .thenCompose(replica -> replica.processPlacementDriverMessage(msg))
+                    .handle((response, ex) -> {
+                        if (ex == null) {
+                            clusterNetSvc.messagingService().respond(senderConsistentId, response, correlationId);
+                        } else {
+                            LOG.error("Failed to process placement driver message [msg={}]", ex, msg);
+                        }
+
+                        return null;
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -187,20 +227,25 @@ public class ReplicaManager implements IgniteComponent {
      *
      * @param replicaGrpId Replication group id.
      * @param listener Replica listener.
+     * @param raftClient Topology aware Raft client.
+     * @param storageIndexTracker Storage index tracker.
+     *
      * @return New replica.
      * @throws NodeStoppingException If node is stopping.
      * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started.
      */
     public Replica startReplica(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener
+            ReplicaListener listener,
+            TopologyAwareRaftGroupService raftClient,
+            PendingComparableValuesTracker<Long> storageIndexTracker
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return startReplicaInternal(replicaGrpId, listener);
+            return startReplicaInternal(replicaGrpId, listener, raftClient, storageIndexTracker);
         } finally {
             busyLock.leaveBusy();
         }
@@ -211,20 +256,25 @@ public class ReplicaManager implements IgniteComponent {
      *
      * @param replicaGrpId   Replication group id.
      * @param listener Replica listener.
+     * @param raftClient Topology aware Raft client.
+     * @param storageIndexTracker Storage index tracker.
      * @return New replica.
      */
     private Replica startReplicaInternal(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener
+            ReplicaListener listener,
+            TopologyAwareRaftGroupService raftClient,
+            PendingComparableValuesTracker<Long> storageIndexTracker
     ) {
-        Replica newReplica = new Replica(replicaGrpId, listener);
+        ClusterNode localNode = clusterNetSvc.topologyService().localMember();
+        Replica newReplica = new Replica(replicaGrpId, listener, storageIndexTracker, raftClient, localNode);
 
         replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
             if (replicaFut == null) {
-                return CompletableFuture.completedFuture(newReplica);
+                return completedFuture(newReplica);
             } else {
                 if (replicaFut.isDone() && !replicaFut.isCancelled() && !replicaFut.isCompletedExceptionally()) {
-                    return CompletableFuture.completedFuture(newReplica);
+                    return completedFuture(newReplica);
                 }
 
                 replicaFut.complete(newReplica);
@@ -269,6 +319,7 @@ public class ReplicaManager implements IgniteComponent {
     @Override
     public void start() {
         clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler);
+        clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, placementDriverMessageHandler);
         messageGroupsToHandle.forEach(mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler));
         scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(
                 this::idleSafeTimeSync,
@@ -309,8 +360,8 @@ public class ReplicaManager implements IgniteComponent {
     private void sendReplicaUnavailableErrorResponse(
             String senderConsistentId,
             @Nullable Long correlationId,
-            ReplicaRequest request,
-            HybridTimestamp requestTimestamp
+            ReplicationGroupId groupId,
+            @Nullable HybridTimestamp requestTimestamp
     ) {
         if (requestTimestamp != null) {
             clusterNetSvc.messagingService().respond(
@@ -319,7 +370,7 @@ public class ReplicaManager implements IgniteComponent {
                             .errorTimestampAwareReplicaResponse()
                             .throwable(
                                     new ReplicaUnavailableException(
-                                            request.groupId(),
+                                            groupId,
                                             clusterNetSvc.topologyService().localMember())
                             )
                             .timestamp(clock.update(requestTimestamp))
@@ -332,7 +383,7 @@ public class ReplicaManager implements IgniteComponent {
                             .errorReplicaResponse()
                             .throwable(
                                     new ReplicaUnavailableException(
-                                        request.groupId(),
+                                        groupId,
                                         clusterNetSvc.topologyService().localMember())
                             )
                             .build(),
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
similarity index 84%
rename from modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
rename to modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index 52bda8200a..b41e5e6485 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft.client;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest.TestReplicationGroup.GROUP_ID;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -68,7 +69,6 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.LeaderChangeNotification;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -85,10 +85,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
     private static final int PORT_BASE = 1234;
 
     @InjectConfiguration
-    protected RaftConfiguration raftConfiguration;
+    private RaftConfiguration raftConfiguration;
 
     /** RPC executor. */
-    protected ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
+    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log));
 
     @Test
     public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception {
@@ -113,8 +113,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         assertNotNull(leader);
 
-        afterInitCheckConditionWithWait(leader.name());
-
         stopCluster(clusterServices, raftServers, List.of(raftClient), nodes);
     }
 
@@ -173,14 +171,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         log.info("Leader: " + leader);
 
-        afterInitCheckConditionWithWait(leader.name());
-
         var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port()));
         raftServiceToStop.stopRaftNodes(GROUP_ID);
         raftServiceToStop.stop();
 
-        afterNodeStop(leader.name());
-
         clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stop();
 
         assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000));
@@ -188,8 +182,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         log.info("New Leader: " + leaderRef.get());
 
-        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
-
         raftClientNoInitialNotify.refreshLeader().get();
 
         assertEquals(raftClientNoInitialNotify.leader().consistentId(), leaderRef.get().name());
@@ -254,8 +246,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         log.info("Leader: " + leader);
 
-        afterInitCheckConditionWithWait(leader.name());
-
         Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get();
 
         log.info("Peer to transfer leader: " + newLeaderPeer);
@@ -271,8 +261,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         log.info("New Leader: " + leaderRef.get());
 
-        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
-
         raftClient.refreshLeader().get();
 
         assertEquals(raftClient.leader().consistentId(), leaderRef.get().name());
@@ -359,9 +347,11 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
                 var localPeer = peersAndLearners.peers().stream()
                         .filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
 
+                var dataPath = workDir.resolve("raft_" + localPeer.consistentId());
+
                 var raftServer = new JraftServerImpl(
                         cluster,
-                        workDir.resolve("raft_" + localPeer.consistentId()),
+                        dataPath,
                         new NodeOptions(),
                         eventsClientListener
                 );
@@ -375,8 +365,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
                 );
 
                 raftServers.put(addr, raftServer);
-
-                afterNodeStart(localPeer.consistentId(), cluster, placementDriverNodesNames, eventsClientListener);
             }
 
             if (addr.port() == clientPort) {
@@ -449,77 +437,6 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
         return addresses;
     }
 
-    @AfterEach
-    protected void afterTest() throws Exception {
-        // No-op.
-    }
-
-    /**
-     * The method is called after every node of the cluster starts.
-     *
-     * @param nodeName Node name.
-     * @param clusterService Cluster service.
-     * @param placementDriverNodesNames Names of all nodes in raft group.
-     * @param eventsClientListener Raft events listener for client.
-     */
-    protected void afterNodeStart(
-            String nodeName,
-            ClusterService clusterService,
-            Set<String> placementDriverNodesNames,
-            RaftGroupEventsClientListener eventsClientListener
-    ) {
-        // No-op.
-    }
-
-    /**
-     * The method is called after every node of the cluster stops.
-     *
-     * @param nodeName Node name.
-     */
-    protected void afterNodeStop(String nodeName) {
-        // No-op.
-    }
-
-    /**
-     * Checks the condition after cluster and raft clients initialization, waiting for this condition.
-     *
-     * @param leaderName Current leader name.
-     * @throws InterruptedException If failed.
-     */
-    private void afterInitCheckConditionWithWait(String leaderName) throws InterruptedException {
-        assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 10_000));
-    }
-
-    /**
-     * Checks the condition after cluster and raft clients initialization.
-     *
-     * @param leaderName Current leader name.
-     * @return Condition result.
-     */
-    protected boolean afterInitCheckCondition(String leaderName) {
-        return true;
-    }
-
-    /**
-     * Checks the condition after leader change, waiting for this condition.
-     *
-     * @param leaderName Current leader name.
-     * @throws InterruptedException If failed.
-     */
-    private void afterLeaderChangeCheckConditionWithWait(String leaderName) throws InterruptedException {
-        assertTrue(waitForCondition(() -> afterLeaderChangeCheckCondition(leaderName), 10_000));
-    }
-
-    /**
-     * Checks the condition after leader change.
-     *
-     * @param leaderName Current leader name.
-     * @return Condition result.
-     */
-    protected boolean afterLeaderChangeCheckCondition(String leaderName) {
-        return true;
-    }
-
     /**
      * Replication test group class.
      */
@@ -563,7 +480,7 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
     /**
      * Test implementation of {@link LogicalTopologyService}.
      */
-    protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
+    private static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
         private final ClusterService clusterService;
 
         public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
@@ -582,12 +499,12 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest {
 
         @Override
         public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
-            return CompletableFuture.completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+            return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
         }
 
         @Override
         public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
-            return CompletableFuture.completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+            return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
         }
     }
 }
diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
new file mode 100644
index 0000000000..da4e7a5b11
--- /dev/null
+++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for placement driver messages processing on replica side.
+ */
+public class PlacementDriverReplicaSideTest {
+    private static final ReplicationGroupId GRP_ID = new ReplicationGroupId() {
+    };
+
+    private static final ClusterNode LOCAL_NODE = new ClusterNode("id0", "name0", new NetworkAddress("localhost", 1234));
+    private static final ClusterNode ANOTHER_NODE = new ClusterNode("id1", "name`", new NetworkAddress("localhost", 2345));
+
+    private static final PlacementDriverMessagesFactory MSG_FACTORY = new PlacementDriverMessagesFactory();
+
+    private Replica replica;
+
+    private AtomicReference<BiConsumer<ClusterNode, Long>> callbackHolder = new AtomicReference<>();
+
+    private PendingComparableValuesTracker<Long> storageIndexTracker;
+
+    private AtomicLong indexOnLeader = new AtomicLong(0);
+
+    private Peer currentLeader = null;
+
+    private int countOfTimeoutExceptionsOnReadIndexToThrow = 0;
+
+    private Replica startReplica() {
+        TopologyAwareRaftGroupService raftClient = mock(TopologyAwareRaftGroupService.class);
+
+        when(raftClient.subscribeLeader(any())).thenAnswer(invocationOnMock -> {
+            BiConsumer<ClusterNode, Long> callback = invocationOnMock.getArgument(0);
+            callbackHolder.set(callback);
+
+            return completedFuture(null);
+        });
+
+        when(raftClient.transferLeadership(any())).thenAnswer(invocationOnMock -> {
+            Peer peer = invocationOnMock.getArgument(0);
+            currentLeader = peer;
+
+            return completedFuture(null);
+        });
+
+        when(raftClient.readIndex()).thenAnswer(invocationOnMock -> {
+            if (countOfTimeoutExceptionsOnReadIndexToThrow > 0) {
+                countOfTimeoutExceptionsOnReadIndexToThrow--;
+                return failedFuture(new TimeoutException());
+            } else {
+                return completedFuture(indexOnLeader.get());
+            }
+        });
+
+        Replica replica = new Replica(
+                GRP_ID,
+                mock(ReplicaListener.class),
+                storageIndexTracker,
+                raftClient,
+                LOCAL_NODE
+        );
+
+        return replica;
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        storageIndexTracker = new PendingComparableValuesTracker<>(0L);
+        indexOnLeader.set(1L);
+        currentLeader = null;
+        countOfTimeoutExceptionsOnReadIndexToThrow = 0;
+        replica = startReplica();
+    }
+
+    /**
+     * Imitates leader election for the group.
+     *
+     * @param leader The leader.
+     */
+    private void leaderElection(ClusterNode leader) {
+        if (callbackHolder.get() != null) {
+            callbackHolder.get().accept(leader, 1L);
+        }
+    }
+
+    /**
+     * Imitates sending {@link org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage} to the replica.
+     *
+     * @param leaseStartTime Lease start time.
+     * @param leaseExpirationTime Lease expiration time.
+     * @param force Force flag.
+     * @return Future that is completed when replica sends a response.
+     */
+    private CompletableFuture<LeaseGrantedMessageResponse> sendLeaseGranted(
+            HybridTimestamp leaseStartTime,
+            HybridTimestamp leaseExpirationTime,
+            boolean force
+    ) {
+        PlacementDriverReplicaMessage msg = MSG_FACTORY.leaseGrantedMessage()
+                .leaseStartTime(leaseStartTime)
+                .leaseExpirationTime(leaseExpirationTime)
+                .force(force)
+                .build();
+
+        return replica.processPlacementDriverMessage(msg).thenApply(LeaseGrantedMessageResponse.class::cast);
+    }
+
+    private HybridTimestamp hts(long physical) {
+        return new HybridTimestamp(currentTimeMillis() + physical * 1000, 0);
+    }
+
+    private void updateIndex(long index) {
+        storageIndexTracker.update(index);
+    }
+
+    @Test
+    public void replicationGroupReadinessAwait() {
+        updateIndex(1L);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+        assertFalse(respFut.isDone());
+        leaderElection(LOCAL_NODE);
+        assertTrue(respFut.isDone());
+    }
+
+    @Test
+    public void replicationGroupReadinessAwaitAnotherNodeLeader() {
+        CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+        assertFalse(respFut.isDone());
+        leaderElection(ANOTHER_NODE);
+        assertTrue(respFut.isDone());
+    }
+
+    @Test
+    public void testGrantLeaseToLeader() {
+        leaderElection(LOCAL_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+
+        updateIndex(1L);
+        assertTrue(respFut.isDone());
+
+        LeaseGrantedMessageResponse resp = respFut.join();
+        assertTrue(resp.accepted());
+        assertNull(resp.redirectProposal());
+    }
+
+    @Test
+    public void testGrantLeaseToNonLeader() {
+        leaderElection(ANOTHER_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(10), hts(20), false);
+
+        assertTrue(respFut.isDone());
+
+        LeaseGrantedMessageResponse resp = respFut.join();
+        assertFalse(resp.accepted());
+        assertEquals(ANOTHER_NODE.name(), resp.redirectProposal());
+    }
+
+    @Test
+    public void testGrantLeaseRepeat() {
+        long leaseStartTime = 10;
+        leaderElection(ANOTHER_NODE);
+        // Sending message with force == true.
+        CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+
+        updateIndex(1L);
+        assertTrue(respFut0.isDone());
+
+        LeaseGrantedMessageResponse resp0 = respFut0.join();
+        assertTrue(resp0.accepted());
+        assertNull(resp0.redirectProposal());
+
+        // Sending the same message once again, with force == false (placement driver actor may have changed and the new lease interval
+        // intersects with previous one).
+        CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+                sendLeaseGranted(hts(leaseStartTime + 8), hts(leaseStartTime + 18), false);
+
+        assertTrue(respFut1.isDone());
+
+        LeaseGrantedMessageResponse resp1 = respFut1.join();
+        assertFalse(resp1.accepted());
+        assertEquals(ANOTHER_NODE.name(), resp1.redirectProposal());
+    }
+
+    @Test
+    public void testGrantLeaseToNodeWithExpiredLease() {
+        long leaseStartTime = 10;
+        updateIndex(1L);
+        leaderElection(LOCAL_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+        assertTrue(respFut0.isDone());
+
+        LeaseGrantedMessageResponse resp0 = respFut0.join();
+        assertTrue(resp0.accepted());
+        assertNull(resp0.redirectProposal());
+
+        CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+                sendLeaseGranted(hts(leaseStartTime + 11), hts(leaseStartTime + 20), false);
+        assertTrue(respFut1.isDone());
+
+        LeaseGrantedMessageResponse resp1 = respFut1.join();
+        assertTrue(resp1.accepted());
+        assertNull(resp1.redirectProposal());
+    }
+
+    @Test
+    public void testGrantLeaseToNodeWithExpiredLeaseAndAnotherLeaderElected() {
+        long leaseStartTime = 10;
+        updateIndex(1L);
+        leaderElection(LOCAL_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+        assertTrue(respFut0.isDone());
+
+        LeaseGrantedMessageResponse resp0 = respFut0.join();
+        assertTrue(resp0.accepted());
+        assertNull(resp0.redirectProposal());
+
+        leaderElection(ANOTHER_NODE);
+
+        CompletableFuture<LeaseGrantedMessageResponse> respFut1 =
+                sendLeaseGranted(hts(leaseStartTime + 11), hts(leaseStartTime + 20), false);
+        assertTrue(respFut1.isDone());
+
+        LeaseGrantedMessageResponse resp1 = respFut1.join();
+        assertFalse(resp1.accepted());
+        assertEquals(ANOTHER_NODE.name(), resp1.redirectProposal());
+    }
+
+    @Test
+    public void testForce() {
+        long leaseStartTime = 10;
+        leaderElection(ANOTHER_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+
+        assertFalse(respFut.isDone());
+
+        updateIndex(1L);
+        assertTrue(respFut.isDone());
+
+        LeaseGrantedMessageResponse resp = respFut.join();
+        assertTrue(resp.accepted());
+        assertNull(resp.redirectProposal());
+
+        // Replica should initiate the leadership transfer.
+        assertEquals(LOCAL_NODE.name(), currentLeader.consistentId());
+    }
+
+    @Test
+    public void testForceToActualLeader() {
+        long leaseStartTime = 10;
+
+        leaderElection(ANOTHER_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), false);
+
+        assertTrue(respFut0.isDone());
+
+        LeaseGrantedMessageResponse resp0 = respFut0.join();
+        assertFalse(resp0.accepted());
+        assertEquals(ANOTHER_NODE.name(), resp0.redirectProposal());
+
+        // After declining the lease grant, local node is elected as a leader and new message with force == true is sent to this
+        // node as actual leader.
+        leaderElection(LOCAL_NODE);
+
+        updateIndex(1L);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut1 = sendLeaseGranted(hts(leaseStartTime), hts(leaseStartTime + 10), true);
+        assertTrue(respFut1.isDone());
+
+        LeaseGrantedMessageResponse resp1 = respFut1.join();
+
+        assertTrue(resp1.accepted());
+        assertNull(resp1.redirectProposal());
+    }
+
+    @Test
+    public void testIncorrectMessageToReplica() {
+        CompletableFuture<?> future = replica.processPlacementDriverMessage(MSG_FACTORY.leaseGrantedMessageResponse().build());
+        assertTrue(future.isDone());
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testLongReadIndexWait() {
+        countOfTimeoutExceptionsOnReadIndexToThrow = 100;
+        updateIndex(1L);
+        leaderElection(LOCAL_NODE);
+        CompletableFuture<LeaseGrantedMessageResponse> respFut0 = sendLeaseGranted(hts(1), hts(10), false);
+        // Actually, it completes faster because TimeoutException is thrown from mock instantly.
+        assertThat(respFut0, willSucceedIn(5, TimeUnit.SECONDS));
+        assertEquals(0, countOfTimeoutExceptionsOnReadIndexToThrow);
+    }
+}
diff --git a/modules/rest-api/openapi/openapi.yaml b/modules/rest-api/openapi/openapi.yaml
index 53cd30ff45..727589e27c 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -327,7 +327,7 @@ paths:
       parameters: []
       responses:
         "200":
-          description: All statuses returned successfully.
+          description: All statutes returned successfully.
           content:
             application/json:
               schema:
@@ -402,7 +402,7 @@ paths:
           type: string
       responses:
         "200":
-          description: All statuses returned successfully.
+          description: All statutes returned successfully.
           content:
             application/json:
               schema:
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 26ea20da90..a5552a8972 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -122,6 +122,7 @@ dependencies {
     integrationTestImplementation project(':ignite-jdbc')
     integrationTestImplementation project(':ignite-security')
     integrationTestImplementation project(':ignite-catalog')
+    integrationTestImplementation project(':ignite-placement-driver')
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-configuration')))
     integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a5ffa5a198..8d03cfd3d9 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManage
 import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.NodeBootstrapConfiguration;
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
@@ -92,6 +93,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAl
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
@@ -148,6 +150,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.jetbrains.annotations.Nullable;
@@ -729,6 +732,14 @@ public class ItRebalanceDistributedTest {
 
             schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
 
+            LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                    clusterService,
+                    logicalTopologyService,
+                    Loza.FACTORY,
+                    new RaftGroupEventsClientListener()
+            );
+
             tableManager = new TableManager(
                     name,
                     registry,
@@ -747,7 +758,8 @@ public class ItRebalanceDistributedTest {
                     schemaManager,
                     view -> new LocalLogStorageFactory(),
                     new HybridClockImpl(),
-                    new OutgoingSnapshotsManager(clusterService.messagingService())
+                    new OutgoingSnapshotsManager(clusterService.messagingService()),
+                    topologyAwareRaftGroupServiceFactory
             ) {
                 @Override
                 protected TxStateTableStorage createTxStateTableStorage(TableConfiguration tableCfg) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index ace3896bf4..32345e60a7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
@@ -113,6 +114,7 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
@@ -333,6 +335,13 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
 
         SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
 
+        TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                clusterSvc,
+                new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+                Loza.FACTORY,
+                new RaftGroupEventsClientListener()
+        );
+
         TableManager tableManager = new TableManager(
                 name,
                 registry,
@@ -351,7 +360,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 schemaManager,
                 view -> new LocalLogStorageFactory(),
                 hybridClock,
-                new OutgoingSnapshotsManager(clusterSvc.messagingService())
+                new OutgoingSnapshotsManager(clusterSvc.messagingService()),
+                topologyAwareRaftGroupServiceFactory
         );
 
         var indexManager = new IndexManager(tblCfg, schemaManager, tableManager);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 004ec601da..52a494ac92 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.app;
 
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
-import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
-
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -32,9 +29,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -93,6 +87,7 @@ import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
@@ -126,7 +121,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.VaultService;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -277,8 +271,6 @@ public class IgniteImpl implements Ignite {
 
     private final RestAddressReporter restAddressReporter;
 
-    private final ScheduledExecutorService raftExecutorService;
-
     private final DistributedConfigurationUpdater distributedConfigurationUpdater;
     private final CatalogManager catalogManager;
 
@@ -341,12 +333,6 @@ public class IgniteImpl implements Ignite {
 
         RaftConfiguration raftConfiguration = nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
 
-        raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
-                new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterSvc.localConfiguration().getName(),
-                        CLIENT_POOL_NAME), LOG
-                )
-        );
-
         // TODO https://issues.apache.org/jira/browse/IGNITE-19051
         RaftGroupEventsClientListener raftGroupEventsClientListener = new RaftGroupEventsClientListener();
 
@@ -355,7 +341,6 @@ public class IgniteImpl implements Ignite {
                 raftConfiguration,
                 workDir,
                 clock,
-                raftExecutorService,
                 raftGroupEventsClientListener
         );
 
@@ -412,6 +397,13 @@ public class IgniteImpl implements Ignite {
 
         TablesConfiguration tablesConfiguration = clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
 
+        TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                clusterSvc,
+                logicalTopologyService,
+                Loza.FACTORY,
+                raftGroupEventsClientListener
+        );
+
         // TODO: IGNITE-16985 Design table management flow
         // placementDriverMgr = new PlacementDriverManager(
         //         metaStorageMgr,
@@ -491,7 +483,8 @@ public class IgniteImpl implements Ignite {
                 schemaManager,
                 volatileLogStorageFactoryCreator,
                 clock,
-                outgoingSnapshotsManager
+                outgoingSnapshotsManager,
+                topologyAwareRaftGroupServiceFactory
         );
 
         indexManager = new IndexManager(tablesConfiguration, schemaManager, distributedTblMgr);
@@ -761,7 +754,6 @@ public class IgniteImpl implements Ignite {
     public void stop() {
         lifecycleManager.stopNode();
         restAddressReporter.removeReport();
-        IgniteUtils.shutdownAndAwaitTermination(raftExecutorService, 10, TimeUnit.SECONDS);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index e8c5d5b257..8a80bb3ade 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -72,6 +72,7 @@ dependencies {
     testImplementation project(':ignite-storage-rocksdb')
     testImplementation project(':ignite-cluster-management')
     testImplementation project(':ignite-vault')
+    testImplementation project(':ignite-placement-driver')
     testImplementation libs.jmh.core
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-configuration')))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 4d87d47f24..e0a6bd511f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -573,7 +574,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 schemaManager,
                 null,
                 clock,
-                mock(OutgoingSnapshotsManager.class)
+                mock(OutgoingSnapshotsManager.class),
+                mock(TopologyAwareRaftGroupServiceFactory.class)
         );
 
         tableManager.start();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 05d68dcc4e..5d35035cc1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -380,7 +380,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
                             partitionDataStorage,
                             storageUpdateHandler,
                             new TestTxStateStorage(),
-                            new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
+                            new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)),
+                            new PendingComparableValuesTracker<>(0L)
                     );
 
                     paths.put(listener, path);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 6680ef65cb..37643df468 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.distributed;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -44,6 +45,9 @@ import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
 import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -106,6 +111,7 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
@@ -156,6 +162,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
     protected Map<String, TxStateStorage> txStateStorages;
 
+    private Map<String, ClusterService> clusterServices;
+
     protected final List<ClusterService> cluster = new CopyOnWriteArrayList<>();
 
     private ScheduledThreadPoolExecutor executor;
@@ -227,9 +235,14 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
         var nodeFinder = new StaticNodeFinder(localAddresses);
 
+        clusterServices = new HashMap<>(nodes);
+
         nodeFinder.findNodes().parallelStream()
-                .map(addr -> startNode(testInfo, addr.toString(), addr.port(), nodeFinder))
-                .forEach(cluster::add);
+                .forEach(addr -> {
+                    ClusterService svc = startNode(testInfo, addr.toString(), addr.port(), nodeFinder);
+                    cluster.add(svc);
+                    clusterServices.put(svc.topologyService().localMember().name(), svc);
+                });
 
         for (ClusterService node : cluster) {
             assertTrue(waitForTopology(node, nodes, 1000));
@@ -424,11 +437,19 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
                 PendingComparableValuesTracker<HybridTimestamp> safeTime =
                         new PendingComparableValuesTracker<>(clocks.get(assignment).now());
+                PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
 
                 PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMpPartStorage);
                 Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> Map.of(pkStorage.get().id(), pkStorage.get());
                 StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
 
+                TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
+                        clusterServices.get(assignment),
+                        logicalTopologyService(clusterServices.get(assignment)),
+                        Loza.FACTORY,
+                        new RaftGroupEventsClientListener()
+                );
+
                 CompletableFuture<Void> partitionReadyFuture = raftServers.get(assignment).startRaftGroupNode(
                         new RaftNodeId(grpId, configuration.peer(assignment)),
                         configuration,
@@ -436,9 +457,11 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                                 partitionDataStorage,
                                 storageUpdateHandler,
                                 txStateStorage,
-                                safeTime
+                                safeTime,
+                                storageIndexTracker
                         ),
-                        RaftGroupEventsListener.noopLsnr
+                        RaftGroupEventsListener.noopLsnr,
+                        topologyAwareRaftGroupServiceFactory
                 ).thenAccept(
                         raftSvc -> {
                             try {
@@ -462,8 +485,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                                                 placementDriver,
                                                 storageUpdateHandler,
                                                 peer -> assignment.equals(peer.consistentId()),
-                                                CompletableFuture.completedFuture(schemaManager)
-                                        )
+                                                completedFuture(schemaManager)
+                                        ),
+                                        raftSvc,
+                                        storageIndexTracker
                                 );
                             } catch (NodeStoppingException e) {
                                 fail("Unexpected node stopping", e);
@@ -512,6 +537,30 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
         return clients;
     }
 
+    private LogicalTopologyService logicalTopologyService(ClusterService clusterService) {
+        return new LogicalTopologyService() {
+            @Override
+            public void addEventListener(LogicalTopologyEventListener listener) {
+
+            }
+
+            @Override
+            public void removeEventListener(LogicalTopologyEventListener listener) {
+
+            }
+
+            @Override
+            public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+                return completedFuture(new LogicalTopologySnapshot(1, clusterService.topologyService().allMembers()));
+            }
+
+            @Override
+            public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
+                return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
+            }
+        };
+    }
+
     /**
      * Returns a raft manager for a group.
      *
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index dde0f20c06..83f6dbb59a 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -126,7 +128,12 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
         clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class,
                 (message, sender, correlationId) -> {
                     try {
-                        replicaManager.startReplica(tablePartitionId, request0 -> CompletableFuture.completedFuture(null));
+                        replicaManager.startReplica(
+                                tablePartitionId,
+                                request0 -> CompletableFuture.completedFuture(null),
+                                mock(TopologyAwareRaftGroupService.class),
+                                new PendingComparableValuesTracker<>(0L)
+                        );
                     } catch (NodeStoppingException e) {
                         throw new RuntimeException(e);
                     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4302d5b154..17fdced54e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -100,6 +100,8 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -278,6 +280,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
+    private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
+
     /** Partitions storage path. */
     private final Path storagePath;
 
@@ -322,6 +326,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param schemaManager Schema manager.
      * @param volatileLogStorageFactoryCreator Creator for {@link org.apache.ignite.internal.raft.storage.LogStorageFactory} for
      *         volatile tables.
+     * @param raftGroupServiceFactory Factory that is used for creation of raft group services for replication groups.
      */
     public TableManager(
             String nodeName,
@@ -341,7 +346,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             SchemaManager schemaManager,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
             HybridClock clock,
-            OutgoingSnapshotsManager outgoingSnapshotsManager
+            OutgoingSnapshotsManager outgoingSnapshotsManager,
+            TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory
     ) {
         this.tablesCfg = tablesCfg;
         this.clusterService = clusterService;
@@ -358,6 +364,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
         this.clock = clock;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
+        this.raftGroupServiceFactory = raftGroupServiceFactory;
 
         clusterNodeResolver = topologyService::getByConsistentId;
 
@@ -690,6 +697,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 placementDriver.updateAssignment(replicaGrpId, newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
                 PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+                PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
 
                 CompletableFuture<PartitionStorages> partitionStoragesFut = getOrCreatePartitionStorages(table, partId);
 
@@ -780,7 +788,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                                             partitionDataStorage,
                                                             storageUpdateHandler,
                                                             txStatePartitionStorage,
-                                                            safeTime
+                                                            safeTime,
+                                                            storageIndexTracker
                                                     ),
                                                     new RebalanceRaftGroupEventsListener(
                                                             metaStorageMgr,
@@ -807,7 +816,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 startGroupFut
                         .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
                             try {
-                                return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration);
+                                return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory);
                             } catch (NodeStoppingException ex) {
                                 return failedFuture(ex);
                             }
@@ -845,7 +854,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                                             this::isLocalPeer,
                                                             schemaManager.schemaRegistry(causalityToken, tblId),
                                                             storageReadyLatch
-                                                    )
+                                                    ),
+                                                    updatedRaftGroupService,
+                                                    storageIndexTracker
                                             );
                                         } catch (NodeStoppingException ex) {
                                             throw new AssertionError("Loza was stopped before Table manager", ex);
@@ -1974,6 +1985,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 .anyMatch(assignment -> !stableAssignments.contains(assignment));
 
         var safeTime = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+        PendingComparableValuesTracker<Long> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
 
         InternalTable internalTable = tbl.internalTable();
 
@@ -2009,7 +2021,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                 partitionDataStorage,
                                 storageUpdateHandler,
                                 txStatePartitionStorage,
-                                safeTime
+                                safeTime,
+                                storageIndexTracker
                         );
 
                         RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
@@ -2058,7 +2071,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                             storageUpdateHandler,
                                             this::isLocalPeer,
                                             completedFuture(schemaManager.schemaRegistry(tblId))
-                                    )
+                                    ),
+                                    (TopologyAwareRaftGroupService) internalTable.partitionRaftGroupService(partId),
+                                    storageIndexTracker
                             );
                         } catch (NodeStoppingException ignored) {
                             // no-op
@@ -2293,7 +2308,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                         replicaMgr.stopReplica(tablePartitionId);
                     } catch (NodeStoppingException e) {
-                        // no-op
+                        // No-op.
                     }
 
                     return tablesByIdVv.get(evt.revision())
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index f9c2fb6bcf..60c9870a29 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -82,22 +82,28 @@ public class PartitionListener implements RaftGroupListener {
     /** Safe time tracker. */
     private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
 
+    /** Storage index tracker. */
+    private final PendingComparableValuesTracker<Long> storageIndexTracker;
+
     /**
      * The constructor.
      *
      * @param partitionDataStorage The storage.
      * @param safeTime Safe time tracker.
+     * @param storageIndexTracker Storage index tracker.
      */
     public PartitionListener(
             PartitionDataStorage partitionDataStorage,
             StorageUpdateHandler storageUpdateHandler,
             TxStateStorage txStateStorage,
-            PendingComparableValuesTracker<HybridTimestamp> safeTime
+            PendingComparableValuesTracker<HybridTimestamp> safeTime,
+            PendingComparableValuesTracker<Long> storageIndexTracker
     ) {
         this.storage = partitionDataStorage;
         this.storageUpdateHandler = storageUpdateHandler;
         this.txStateStorage = txStateStorage;
         this.safeTime = safeTime;
+        this.storageIndexTracker = storageIndexTracker;
 
         // TODO: IGNITE-18502 Implement a pending update storage
         try (PartitionTimestampCursor cursor = partitionDataStorage.getStorage().scan(HybridTimestamp.MAX_VALUE)) {
@@ -179,6 +185,8 @@ public class PartitionListener implements RaftGroupListener {
 
                 safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
             }
+
+            storageIndexTracker.update(commandIndex);
         });
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 357b2ab430..0da68a8da7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -387,7 +387,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
             return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
         } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-            return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
+            return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -622,9 +622,16 @@ public class PartitionReplicaListener implements ReplicaListener {
      * Handler to process {@link ReplicaSafeTimeSyncRequest}.
      *
      * @param request Request.
+     * @param isPrimary Whether is primary replica.
      * @return Future.
      */
-    private CompletableFuture<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+    private CompletableFuture<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request, Boolean isPrimary) {
+        requireNonNull(isPrimary);
+
+        if (!isPrimary) {
+            return completedFuture(null);
+        }
+
         return raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
     }
 
@@ -2071,7 +2078,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 }
                             }
                     );
-        } else if (request instanceof ReadOnlyReplicaRequest) {
+        } else if (request instanceof ReadOnlyReplicaRequest || request instanceof ReplicaSafeTimeSyncRequest) {
             return raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> isLocalPeerChecker.apply(replicaAndTerm.leader()));
         } else {
             return completedFuture(null);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index 9294016373..5b8396b5d8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.SchemaManager;
@@ -227,7 +228,8 @@ public class TableManagerDistributionZonesTest extends IgniteAbstractTest {
                 mock(SchemaManager.class),
                 null,
                 null,
-                mock(OutgoingSnapshotsManager.class)
+                mock(OutgoingSnapshotsManager.class),
+                mock(TopologyAwareRaftGroupServiceFactory.class)
         );
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 5d727ad316..10592e3eb5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
@@ -489,7 +490,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         TableImpl table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
 
-        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
+        verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any(), any());
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -728,7 +729,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 sm = new SchemaManager(revisionUpdater, tblsCfg, msm),
                 budgetView -> new LocalLogStorageFactory(),
                 new HybridClockImpl(),
-                new OutgoingSnapshotsManager(clusterService.messagingService())
+                new OutgoingSnapshotsManager(clusterService.messagingService()),
+                mock(TopologyAwareRaftGroupServiceFactory.class)
         ) {
             @Override
             protected MvTableStorage createTableStorage(TableConfiguration tableCfg, TablesConfiguration tablesCfg) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 7861ae3e31..efa846bb69 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -198,7 +198,8 @@ public class PartitionCommandListenerTest {
                 partitionDataStorage,
                 storageUpdateHandler,
                 txStateStorage,
-                safeTimeTracker
+                safeTimeTracker,
+                new PendingComparableValuesTracker<>(0L)
         );
     }
 
@@ -288,7 +289,8 @@ public class PartitionCommandListenerTest {
                 partitionDataStorage,
                 storageUpdateHandler,
                 txStateStorage,
-                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0))
+                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0)),
+                new PendingComparableValuesTracker<>(0L)
         );
 
         txStateStorage.lastApplied(3L, 1L);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 57ebfe0dba..eb49f6da65 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -298,7 +298,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 new TestPartitionDataStorage(mvPartStorage),
                 storageUpdateHandler,
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
-                safeTime
+                safeTime,
+                new PendingComparableValuesTracker<>(0L)
         );
 
         safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime), "safe-time-updater");
diff --git a/settings.gradle b/settings.gradle
index f33fd8d336..5cb7e03f00 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -65,6 +65,7 @@ include(':packaging')
 include(':ignite-replicator')
 include(':ignite-distribution-zones')
 include(':ignite-placement-driver')
+include(':ignite-placement-driver-api')
 include(':ignite-code-deployment')
 include(':ignite-security')
 include(':ignite-catalog')
@@ -118,6 +119,7 @@ project(":packaging-db").projectDir = file('packaging/db')
 project(":packaging").projectDir = file('packaging')
 project(":ignite-distribution-zones").projectDir = file('modules/distribution-zones')
 project(":ignite-placement-driver").projectDir = file('modules/placement-driver')
+project(":ignite-placement-driver-api").projectDir = file('modules/placement-driver-api')
 project(":ignite-code-deployment").projectDir = file('modules/code-deployment')
 project(":ignite-security").projectDir = file('modules/security')
 project(":ignite-catalog").projectDir = file('modules/catalog')