You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2022/10/27 21:18:35 UTC

[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17263 Leader to replica safe time propagation implemented (#1269)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
     new dcdd4e13ee IGNITE-17263 Leader to replica safe time propagation implemented  (#1269)
dcdd4e13ee is described below

commit dcdd4e13eef5db0973b49648dfa4e7ade1170f7c
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Fri Oct 28 00:18:30 2022 +0300

    IGNITE-17263 Leader to replica safe time propagation implemented  (#1269)
---
 .../internal/cluster/management/MockNode.java      |   4 +-
 .../management/raft/ItCmgRaftServiceTest.java      |   4 +-
 .../apache/ignite/internal/hlc/HybridClock.java    |  72 +------
 .../hlc/{HybridClock.java => HybridClockImpl.java} |  14 +-
 .../util/PendingComparableValuesTracker.java       | 132 +++++++++++++
 .../apache/ignite/internal/HybridClockTest.java    |   9 +-
 .../apache/ignite/internal/TestHybridClock.java}   |  58 +++---
 .../util/PendingComparableValuesTrackerTest.java   | 134 +++++++++++++
 modules/raft/pom.xml                               |   6 +
 .../apache/ignite/internal/raft/ItLozaTest.java    |   4 +-
 .../internal/raft/ItRaftGroupServiceTest.java      |   4 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |   7 +-
 .../raft/server/ItJraftCounterServerTest.java      | 166 +---------------
 .../apache/ignite/raft/server/ItSafeTimeTest.java  | 142 +++++++++++++
 .../ignite/raft/server/JraftAbstractTest.java      | 219 +++++++++++++++++++++
 .../java/org/apache/ignite/internal/raft/Loza.java |  24 +++
 .../internal/raft/server/RaftGroupOptions.java     |  19 ++
 .../raft/server/ReplicationGroupOptions.java       |  48 +++++
 .../internal/raft/server/impl/JraftServerImpl.java |   4 +
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   6 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  14 ++
 .../apache/ignite/raft/jraft/core/Replicator.java  |   6 +-
 .../ignite/raft/jraft/option/FSMCallerOptions.java |  10 +
 .../ignite/raft/jraft/option/NodeOptions.java      |  17 +-
 .../raft/jraft/util/SafeTimeCandidateManager.java  |  90 +++++++++
 .../org/apache/ignite/internal/raft/LozaTest.java  |   4 +-
 .../ignite/raft/jraft/core/FSMCallerTest.java      |   9 +
 .../jraft/util/SafeTimeCandidatesManagerTest.java  |  51 +++++
 .../apache/ignite/internal/replicator/Replica.java |   8 +
 .../ignite/internal/replicator/ReplicaManager.java |  43 +++-
 .../replicator/command/SafeTimeSyncCommand.java    |  26 +++
 .../replicator/message/ReplicaMessageGroup.java    |   3 +
 ...eGroup.java => ReplicaSafeTimeSyncRequest.java} |  22 +--
 .../ItDistributedConfigurationPropertiesTest.java  |   4 +-
 .../ItDistributedConfigurationStorageTest.java     |   4 +-
 .../storage/ItRebalanceDistributedTest.java        |   9 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../internal/runner/app/ItTablesApiTest.java       |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |   4 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   3 +-
 .../engine/exec/rel/TableScanExecutionTest.java    |   3 +-
 .../storage/AbstractMvPartitionStorageTest.java    |   3 +-
 .../ItAbstractInternalTableScanTest.java           |   3 +-
 .../ItInternalTableReadOnlyOperationsTest.java     |   3 +-
 .../ItInternalTableReadOnlyScanTest.java           |   3 +-
 .../ignite/distributed/ItTablePersistenceTest.java |   9 +-
 .../distributed/ItTxDistributedTestSingleNode.java |  35 +++-
 .../ignite/internal/table/ItColocationTest.java    |   3 +-
 .../internal/table/distributed/TableManager.java   |  34 +++-
 .../table/distributed/raft/PartitionListener.java  |  12 ++
 .../replicator/PartitionReplicaListener.java       | 163 ++++++++++-----
 .../apache/ignite/internal/table/TxLocalTest.java  |   4 +-
 .../table/distributed/TableManagerTest.java        |   3 +-
 .../PartitionRaftCommandsSerializationTest.java    |   5 +-
 .../raft/PartitionCommandListenerTest.java         |   5 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   8 +-
 .../replication/PartitionReplicaListenerTest.java  |  11 +-
 .../table/impl/DummyInternalTableImpl.java         |  14 +-
 .../apache/ignite/internal/tx/TxManagerTest.java   |   4 +-
 60 files changed, 1318 insertions(+), 417 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 28c95c33b1..3c059d2ae9 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -25,7 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.util.ReverseIterator;
@@ -74,7 +74,7 @@ public class MockNode {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClock());
+        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
 
         this.clusterManager = new ClusterManagementGroupManager(
                 vaultManager,
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 017abd973f..c62e079067 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
 import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -89,7 +89,7 @@ public class ItCmgRaftServiceTest {
 
         Node(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder, Path workDir) {
             this.clusterService = clusterService(testInfo, addr.port(), nodeFinder);
-            this.raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+            this.raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
         }
 
         void start() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 591c96c516..1c7e4fc1f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -17,63 +17,16 @@
 
 package org.apache.ignite.internal.hlc;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.time.Clock;
-import org.apache.ignite.internal.tostring.S;
-
 /**
  * A Hybrid Logical Clock.
  */
-public class HybridClock {
-    /**
-     * Var handle for {@link #latestTime}.
-     */
-    private static final VarHandle LATEST_TIME;
-
-    static {
-        try {
-            LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
-        } catch (NoSuchFieldException | IllegalAccessException e) {
-            throw new ExceptionInInitializerError(e);
-        }
-    }
-
-    /** Latest timestamp. */
-    private volatile HybridTimestamp latestTime;
-
-    /**
-     * The constructor which initializes the latest time to current time by system clock.
-     */
-    public HybridClock() {
-        this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
-    }
-
+public interface HybridClock {
     /**
      * Creates a timestamp for new event.
      *
      * @return The hybrid timestamp.
      */
-    public HybridTimestamp now() {
-        while (true) {
-            long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
-
-            // Read the latest time after accessing UTC time to reduce contention.
-            HybridTimestamp latestTime = this.latestTime;
-
-            HybridTimestamp newLatestTime;
-
-            if (latestTime.getPhysical() >= currentTimeMillis) {
-                newLatestTime = latestTime.addTicks(1);
-            } else {
-                newLatestTime = new HybridTimestamp(currentTimeMillis, 0);
-            }
-
-            if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
-                return newLatestTime;
-            }
-        }
-    }
+    HybridTimestamp now();
 
     /**
      * Creates a timestamp for a received event.
@@ -81,24 +34,5 @@ public class HybridClock {
      * @param requestTime Timestamp from request.
      * @return The hybrid timestamp.
      */
-    public HybridTimestamp update(HybridTimestamp requestTime) {
-        while (true) {
-            HybridTimestamp now = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
-
-            // Read the latest time after accessing UTC time to reduce contention.
-            HybridTimestamp latestTime = this.latestTime;
-
-            HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
-
-            if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
-                return newLatestTime;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return S.toString(HybridClock.class, this);
-    }
+    HybridTimestamp update(HybridTimestamp requestTime);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
similarity index 88%
copy from modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 591c96c516..c5b256f75a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -23,9 +23,9 @@ import java.time.Clock;
 import org.apache.ignite.internal.tostring.S;
 
 /**
- * A Hybrid Logical Clock.
+ * A Hybrid Logical Clock implementation.
  */
-public class HybridClock {
+public class HybridClockImpl implements HybridClock {
     /**
      * Var handle for {@link #latestTime}.
      */
@@ -33,19 +33,19 @@ public class HybridClock {
 
     static {
         try {
-            LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
+            LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClockImpl.class, "latestTime", HybridTimestamp.class);
         } catch (NoSuchFieldException | IllegalAccessException e) {
             throw new ExceptionInInitializerError(e);
         }
     }
 
     /** Latest timestamp. */
-    private volatile HybridTimestamp latestTime;
+    protected volatile HybridTimestamp latestTime;
 
     /**
      * The constructor which initializes the latest time to current time by system clock.
      */
-    public HybridClock() {
+    public HybridClockImpl() {
         this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
     }
 
@@ -88,7 +88,9 @@ public class HybridClock {
             // Read the latest time after accessing UTC time to reduce contention.
             HybridTimestamp latestTime = this.latestTime;
 
-            HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
+            HybridTimestamp maxLatestTime = HybridTimestamp.max(now, requestTime, latestTime);
+
+            HybridTimestamp newLatestTime = maxLatestTime.addTicks(1);
 
             if (LATEST_TIME.compareAndSet(this, latestTime, newLatestTime)) {
                 return newLatestTime;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
new file mode 100644
index 0000000000..a976c01e51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracker that stores comparable value internally, this value can grow when {@link #update(Comparable)} method is called. The tracker gives
+ * ability to wait for certain value, see {@link #waitFor(Comparable)}.
+ */
+public class PendingComparableValuesTracker<T extends Comparable<T>> {
+    /** Map of comparable values to corresponding futures. */
+    private final ConcurrentSkipListMap<T, Collection<CompletableFuture<Void>>> valueFutures = new ConcurrentSkipListMap<>();
+
+    /** Current value. */
+    public final AtomicReference<T> current;
+
+    /**
+     * Constructor with initial value.
+     *
+     * @param initialValue Initial value.
+     */
+    public PendingComparableValuesTracker(T initialValue) {
+        current = new AtomicReference<>(initialValue);
+    }
+
+    /**
+     * Updates the internal state, if it is lower than {@code newValue} and completes all futures waiting for {@code newValue}
+     * that had been created for corresponding values that are lower than the given one.
+     *
+     * @param newValue New value.
+     */
+    public void update(T newValue) {
+        for (Map.Entry<T, Collection<CompletableFuture<Void>>> e : valueFutures.entrySet()) {
+            if (newValue.compareTo(e.getKey()) >= 0) {
+                valueFutures.compute(e.getKey(), (k, v) -> {
+                    if (v != null) {
+                        v.forEach(f -> f.complete(null));
+                    }
+
+                    return null;
+                });
+            } else {
+                break;
+            }
+        }
+
+        while (true) {
+            T current = this.current.get();
+
+            if (newValue.compareTo(current) > 0) {
+                if (this.current.compareAndSet(current, newValue)) {
+                    return;
+                }
+            } else {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Provides the future that is completed when this tracker's internal value reaches given one. If the internal value is greater or
+     * equal then the given one, returns completed future.
+     *
+     * @param valueToWait Value to wait.
+     * @return Future.
+     */
+    public CompletableFuture<Void> waitFor(T valueToWait) {
+        if (current.get().compareTo(valueToWait) >= 0) {
+            return completedFuture(null);
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        valueFutures.compute(valueToWait, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentLinkedDeque<>();
+            }
+
+            v.add(future);
+
+            return v;
+        });
+
+        if (current.get().compareTo(valueToWait) >= 0) {
+            future.complete(null);
+
+            valueFutures.compute(valueToWait, (k, v) -> {
+                if (v == null) {
+                    return null;
+                } else {
+                    v.remove(future);
+                }
+
+                return v;
+            });
+        }
+
+        return future;
+    }
+
+    /**
+     * Returns current internal value.
+     *
+     * @return Current value.
+     */
+    public T current() {
+        return current.get();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index 5442de531b..c5a5633e81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -15,13 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.hlc;
+package org.apache.ignite.internal;
 
 import static org.apache.ignite.internal.hlc.HybridClockTestUtils.mockToEpochMilli;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.time.Clock;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
@@ -48,7 +51,7 @@ class HybridClockTest {
     public void testNow() {
         clockMock = mockToEpochMilli(100);
 
-        HybridClock clock = new HybridClock();
+        HybridClock clock = new HybridClockImpl();
 
         assertTimestampEquals(100, new HybridTimestamp(100, 1), clock::now);
 
@@ -66,7 +69,7 @@ class HybridClockTest {
     public void testTick() {
         clockMock = mockToEpochMilli(100);
 
-        HybridClock clock = new HybridClock();
+        HybridClock clock = new HybridClockImpl();
 
         assertTimestampEquals(100, new HybridTimestamp(100, 1),
                 () -> clock.update(new HybridTimestamp(50, 1)));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java b/modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
similarity index 64%
copy from modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
index 591c96c516..f795d74fd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestHybridClock.java
@@ -15,17 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.hlc;
+package org.apache.ignite.internal;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
-import java.time.Clock;
-import org.apache.ignite.internal.tostring.S;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
- * A Hybrid Logical Clock.
+ * Test hybrid clock with custom supplier of current time.
  */
-public class HybridClock {
+public class TestHybridClock implements HybridClock {
+    /** Supplier of current time in milliseconds. */
+    private final LongSupplier currentTimeMillisSupplier;
+
+    /** Latest time. */
+    private volatile HybridTimestamp latestTime;
+
     /**
      * Var handle for {@link #latestTime}.
      */
@@ -33,30 +40,22 @@ public class HybridClock {
 
     static {
         try {
-            LATEST_TIME = MethodHandles.lookup().findVarHandle(HybridClock.class, "latestTime", HybridTimestamp.class);
+            LATEST_TIME = MethodHandles.lookup().findVarHandle(TestHybridClock.class, "latestTime", HybridTimestamp.class);
         } catch (NoSuchFieldException | IllegalAccessException e) {
             throw new ExceptionInInitializerError(e);
         }
     }
 
-    /** Latest timestamp. */
-    private volatile HybridTimestamp latestTime;
-
-    /**
-     * The constructor which initializes the latest time to current time by system clock.
-     */
-    public HybridClock() {
-        this.latestTime = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), 0);
+    public TestHybridClock(LongSupplier currentTimeMillisSupplier) {
+        this.currentTimeMillisSupplier = currentTimeMillisSupplier;
+        this.latestTime = new HybridTimestamp(currentTimeMillisSupplier.getAsLong(), 0);
     }
 
-    /**
-     * Creates a timestamp for new event.
-     *
-     * @return The hybrid timestamp.
-     */
+    /** {@inheritDoc} */
+    @Override
     public HybridTimestamp now() {
         while (true) {
-            long currentTimeMillis = Clock.systemUTC().instant().toEpochMilli();
+            long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
 
             // Read the latest time after accessing UTC time to reduce contention.
             HybridTimestamp latestTime = this.latestTime;
@@ -75,18 +74,11 @@ public class HybridClock {
         }
     }
 
-    /**
-     * Creates a timestamp for a received event.
-     *
-     * @param requestTime Timestamp from request.
-     * @return The hybrid timestamp.
-     */
+    /** {@inheritDoc} */
+    @Override
     public HybridTimestamp update(HybridTimestamp requestTime) {
         while (true) {
-            HybridTimestamp now = new HybridTimestamp(Clock.systemUTC().instant().toEpochMilli(), -1);
-
-            // Read the latest time after accessing UTC time to reduce contention.
-            HybridTimestamp latestTime = this.latestTime;
+            HybridTimestamp now = new HybridTimestamp(currentTimeMillisSupplier.getAsLong(), -1);
 
             HybridTimestamp newLatestTime = HybridTimestamp.max(now, requestTime, latestTime).addTicks(1);
 
@@ -95,10 +87,4 @@ public class HybridClock {
             }
         }
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public String toString() {
-        return S.toString(HybridClock.class, this);
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
new file mode 100644
index 0000000000..d61a211569
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link PendingComparableValuesTracker}.
+ */
+public class PendingComparableValuesTrackerTest {
+    @Test
+    public void testSimpleWaitFor() {
+        HybridTimestamp ts = new HybridTimestamp(1, 0);
+
+        PendingComparableValuesTracker<HybridTimestamp> tracker = new PendingComparableValuesTracker<>(ts);
+
+        HybridTimestamp ts1 = new HybridTimestamp(ts.getPhysical() + 1_000_000, 0);
+        HybridTimestamp ts2 = new HybridTimestamp(ts.getPhysical() + 2_000_000, 0);
+        HybridTimestamp ts3 = new HybridTimestamp(ts.getPhysical() + 3_000_000, 0);
+
+        CompletableFuture<Void> f0 = tracker.waitFor(ts1);
+        CompletableFuture<Void> f1 = tracker.waitFor(ts2);
+        CompletableFuture<Void> f2 = tracker.waitFor(ts3);
+
+        assertFalse(f0.isDone());
+        assertFalse(f1.isDone());
+        assertFalse(f2.isDone());
+
+        tracker.update(ts1);
+        assertThat(f0, willCompleteSuccessfully());
+        assertFalse(f1.isDone());
+        assertFalse(f2.isDone());
+
+        tracker.update(ts2);
+        assertThat(f1, willCompleteSuccessfully());
+        assertFalse(f2.isDone());
+
+        tracker.update(ts3);
+        assertThat(f2, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void testMultithreadedWaitFor() throws Exception {
+        HybridClock clock = new HybridClockImpl();
+
+        PendingComparableValuesTracker<HybridTimestamp> tracker = new PendingComparableValuesTracker<>(clock.now());
+
+        int threads = Runtime.getRuntime().availableProcessors() * 2;
+
+        Collection<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> allFutures = new ConcurrentLinkedDeque<>();
+
+        int iterations = 10_000;
+
+        runMultiThreaded(
+                () -> {
+                    List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> prevFutures = new ArrayList<>();
+                    ThreadLocalRandom random = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < iterations; i++) {
+                        HybridTimestamp now = clock.now();
+                        tracker.update(now);
+                        HybridTimestamp timestampToWait =
+                            new HybridTimestamp(now.getPhysical() + 1, now.getLogical() + random.nextInt(1000));
+
+                        CompletableFuture<Void> future = tracker.waitFor(timestampToWait);
+
+                        IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp> pair = new IgniteBiTuple<>(future, timestampToWait);
+
+                        prevFutures.add(pair);
+                        allFutures.add(pair);
+
+                        if (i % 10 == 0) {
+                            for (Iterator<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> it = prevFutures.iterator();
+                                    it.hasNext();) {
+                                IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp> t = it.next();
+
+                                if (t.get2().compareTo(now) <= 0) {
+                                    assertTrue(t.get1().isDone(), "now=" + now + ", ts=" + t.get2() + ", trackerTs=" + tracker.current());
+
+                                    it.remove();
+                                }
+                            }
+                        }
+                    }
+
+                    return null;
+                },
+                threads,
+                "trackableHybridClockTest"
+        );
+
+        Thread.sleep(5);
+
+        tracker.update(clock.now());
+
+        List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> uncompleted =
+                allFutures.stream().filter(f -> !f.get1().isDone()).collect(toList());
+
+        assertTrue(uncompleted.isEmpty());
+    }
+}
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index bd68cef47c..7f697ac57f 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -106,6 +106,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- Benchmark dependencies -->
         <dependency>
             <groupId>org.openjdk.jmh</groupId>
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index b4d247f228..90810cbdc0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -126,7 +126,7 @@ public class ItLozaTest {
 
             CompletableFuture<NetworkMessage> exception = CompletableFuture.failedFuture(new IOException());
 
-            loza = new Loza(service, raftConfiguration, dataPath, new HybridClock());
+            loza = new Loza(service, raftConfiguration, dataPath, new HybridClockImpl());
 
             loza.start();
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index 2ba4b40c49..b155af00b8 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -102,7 +102,7 @@ public class ItRaftGroupServiceTest {
         CompletableFuture<RaftGroupService>[] svcFutures = new CompletableFuture[NODES_CNT];
 
         for (int i = 0; i < NODES_CNT; i++) {
-            Loza raftServer = new Loza(clusterServices.get(i), raftConfiguration, workDir.resolve("node" + i), new HybridClock());
+            Loza raftServer = new Loza(clusterServices.get(i), raftConfiguration, workDir.resolve("node" + i), new HybridClockImpl());
 
             raftSrvs.add(raftServer);
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 55210e4cd3..5df597fb0b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -68,6 +68,7 @@ import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -3779,7 +3780,7 @@ public class ItNodeTest {
         for (PeerId peer : peers) {
             RaftOptions opts = new RaftOptions();
             opts.setElectionHeartbeatFactor(4); // Election timeout divisor.
-            HybridClock clock = new HybridClock();
+            HybridClock clock = new HybridClockImpl();
             assertTrue(cluster.start(peer.getEndpoint(), false, 300, false, null, opts, clock));
         }
 
@@ -3816,11 +3817,11 @@ public class ItNodeTest {
 
                     if (msg.entriesList() == null && msg.data() == null) {
                         heartbeatRequest.set(true);
-                        assertTrue(msg.timestamp() != null);
                     } else {
                         appendEntriesRequest.set(true);
-                        assertTrue(msg.timestamp() == null);
                     }
+
+                    assertTrue(msg.timestamp() != null);
                 } else if (msgs[0] instanceof AppendEntriesResponseImpl) {
                     AppendEntriesResponseImpl msg = (AppendEntriesResponseImpl) msgs[0];
                     if (msg.timestamp() == null) {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 3a16b68619..ede4466f8b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -23,7 +23,6 @@ import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
 import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
-import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -43,28 +42,16 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.ReadCommand;
@@ -76,7 +63,6 @@ import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.server.counter.CounterListener;
@@ -95,12 +81,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
  * Jraft server.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-class ItJraftCounterServerTest extends RaftServerAbstractTest {
-    /**
-     * The logger.
-     */
-    private static final IgniteLogger LOG = Loggers.forClass(ItJraftCounterServerTest.class);
-
+class ItJraftCounterServerTest extends JraftAbstractTest {
     /**
      * Counter group name 0.
      */
@@ -111,56 +92,20 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
      */
     private static final TestReplicationGroupId COUNTER_GROUP_1 = new TestReplicationGroupId("counter1");
 
-    /**
-     * The server port offset.
-     */
-    private static final int PORT = 5003;
-
-    /**
-     * The client port offset.
-     */
-    private static final int CLIENT_PORT = 6003;
-
-    /**
-     * Initial configuration.
-     */
-    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
-            .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
-            .map(Peer::new)
-            .collect(Collectors.toUnmodifiableList());
-
     /**
      * Listener factory.
      */
     private Supplier<CounterListener> listenerFactory = CounterListener::new;
 
-    /**
-     * Servers list.
-     */
-    private final List<JraftServerImpl> servers = new ArrayList<>();
-
-    /**
-     * Clients list.
-     */
-    private final List<RaftGroupService> clients = new ArrayList<>();
-
-    /**
-     * Data path.
-     */
-    @WorkDirectory
-    private Path dataPath;
-
-    /** Executor for raft group services. */
-    private ScheduledExecutorService executor;
-
     /**
      * Before each.
      */
     @BeforeEach
+    @Override
     void before() {
         LOG.info(">>>>>>>>>>>>>>> Start test method: {}", testInfo.getTestMethod().orElseThrow().getName());
 
-        executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
+        super.before();
     }
 
     /**
@@ -171,112 +116,9 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     protected void after() throws Exception {
         super.after();
 
-        shutdownCluster();
-
-        IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
-
-        TestUtils.assertAllJraftThreadsStopped();
-
         LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
     }
 
-    private void shutdownCluster() throws Exception {
-        LOG.info("Start client shutdown");
-
-        Iterator<RaftGroupService> iterClients = clients.iterator();
-
-        while (iterClients.hasNext()) {
-            RaftGroupService client = iterClients.next();
-
-            iterClients.remove();
-
-            client.shutdown();
-        }
-
-        clients.clear();
-
-        LOG.info("Start server shutdown servers={}", servers.size());
-
-        Iterator<JraftServerImpl> iterSrv = servers.iterator();
-
-        while (iterSrv.hasNext()) {
-            JraftServerImpl server = iterSrv.next();
-
-            iterSrv.remove();
-
-            Set<ReplicationGroupId> grps = server.startedGroups();
-
-            for (ReplicationGroupId grp : grps) {
-                server.stopRaftGroup(grp);
-            }
-
-            server.beforeNodeStop();
-
-            server.stop();
-        }
-
-        servers.clear();
-    }
-
-    /**
-     * Starts server.
-     *
-     * @param idx  The index.
-     * @param clo  Init closure.
-     * @param cons Node options updater.
-     * @return Raft server instance.
-     */
-    private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> cons) {
-        var addr = new NetworkAddress(getLocalAddress(), PORT);
-
-        ClusterService service = clusterService(PORT + idx, List.of(addr), true);
-
-        NodeOptions opts = new NodeOptions();
-
-        cons.accept(opts);
-
-        JraftServerImpl server = new JraftServerImpl(service, dataPath.resolve("node" + idx), opts) {
-            @Override
-            public void stop() throws Exception {
-                servers.remove(this);
-
-                super.stop();
-
-                service.stop();
-            }
-        };
-
-        server.start();
-
-        clo.accept(server);
-
-        servers.add(server);
-
-        assertTrue(waitForTopology(service, servers.size(), 15_000));
-
-        return server;
-    }
-
-    /**
-     * Starts client.
-     *
-     * @param groupId Group id.
-     * @return The client.
-     * @throws Exception If failed.
-     */
-    private RaftGroupService startClient(TestReplicationGroupId groupId) throws Exception {
-        var addr = new NetworkAddress(getLocalAddress(), PORT);
-
-        ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true);
-
-        RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
-                List.of(new Peer(addr)), false, 200, executor).get(3, TimeUnit.SECONDS);
-
-        clients.add(client);
-
-        return client;
-    }
-
     /**
      * Starts a cluster for the test.
      *
@@ -294,8 +136,6 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         startClient(COUNTER_GROUP_1);
     }
 
-
-
     /**
      * Checks that the number of Disruptor threads does not depend on  count started RAFT nodes.
      */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
new file mode 100644
index 0000000000..80e3377919
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.ReplicationGroupOptions;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.server.counter.CounterListener;
+import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration test for checking safe time propagation.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItSafeTimeTest extends JraftAbstractTest {
+    /** Raft group id. */
+    private static final ReplicationGroupId RAFT_GROUP_ID = new TestReplicationGroupId("testGroup");
+
+    /** Nodes count. */
+    private static final int NODES = 3;
+
+    /** Hybrid clocks. */
+    private List<HybridClock> clocks = new ArrayList<>();
+
+    /** Safe tims clocks. */
+    private List<PendingComparableValuesTracker<HybridTimestamp>> safeTimeContainers = new ArrayList<>();
+
+    /** Before each. */
+    @BeforeEach
+    @Override
+    void before() {
+        LOG.info(">>>>>>>>>>>>>>> Start test method: {}", testInfo.getTestMethod().orElseThrow().getName());
+
+        super.before();
+    }
+
+    /** After each. */
+    @AfterEach
+    @Override
+    protected void after() throws Exception {
+        super.after();
+
+        LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
+    }
+
+    /**
+     * Starts a cluster for the test.
+     *
+     * @throws Exception If failed.
+     */
+    private void startCluster() throws Exception {
+        for (int i = 0; i < NODES; i++) {
+            HybridClock clock = new TestHybridClock(() -> 1L);
+            PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
+            clocks.add(clock);
+            safeTimeContainers.add(safeTime);
+
+            startServer(i,
+                    raftServer -> {
+                        RaftGroupOptions groupOptions = defaults()
+                                .replicationGroupOptions(new ReplicationGroupOptions().safeTime(safeTime));
+
+                        raftServer.startRaftGroup(RAFT_GROUP_ID, new CounterListener(), INITIAL_CONF, groupOptions);
+                    },
+                    opts -> {
+                        opts.setClock(clock);
+                        opts.setSafeTimeTracker(safeTime);
+                    }
+            );
+        }
+
+        startClient(RAFT_GROUP_ID);
+    }
+
+    /**
+     * Tests if a raft group become unavailable in case of a critical error.
+     */
+    @Test
+    public void test() throws Exception {
+        startCluster();
+
+        RaftGroupService client1 = clients.get(0);
+
+        client1.refreshLeader().get();
+        Peer leader = client1.leader();
+
+        final int leaderIndex = INITIAL_CONF.indexOf(leader);
+
+        assertTrue(leaderIndex >= 0);
+
+        final long leaderPhysicalTime = 100;
+
+        clocks.get(leaderIndex).update(new HybridTimestamp(leaderPhysicalTime, 0));
+
+        client1.run(new IncrementAndGetCommand(1)).get();
+
+        waitForCondition(() -> {
+            for (int i = 0; i < NODES; i++) {
+                // As current time provider for safe time clocks always returns 1,
+                // the only way for physical component to reach leaderPhysicalTime is safe time propagation mechanism.
+                if (i != leaderIndex && safeTimeContainers.get(i).current().getPhysical() != leaderPhysicalTime) {
+                    return false;
+                }
+            }
+
+            return true;
+        }, 2000);
+    }
+}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
new file mode 100644
index 0000000000..f6ca5bbf5f
--- /dev/null
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.server;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
+import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract class for raft tests using JRaftServer.
+ */
+public abstract class JraftAbstractTest extends RaftServerAbstractTest {
+    /**
+     * The logger.
+     */
+    protected static final IgniteLogger LOG = Loggers.forClass(ItJraftCounterServerTest.class);
+
+    /**
+     * The server port offset.
+     */
+    protected static final int PORT = 5003;
+
+    /**
+     * The client port offset.
+     */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Initial configuration.
+     */
+    protected static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+            .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+            .map(Peer::new)
+            .collect(Collectors.toUnmodifiableList());
+
+    /**
+     * Servers list.
+     */
+    protected final List<JraftServerImpl> servers = new ArrayList<>();
+
+    /**
+     * Clients list.
+     */
+    protected final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Data path.
+     */
+    @WorkDirectory
+    protected Path dataPath;
+
+    /** Executor for raft group services. */
+    private ScheduledExecutorService executor;
+
+    /**
+     * Before each.
+     */
+    @BeforeEach
+    void before() {
+        executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
+    }
+
+    /**
+     * After each.
+     */
+    @AfterEach
+    @Override
+    protected void after() throws Exception {
+        super.after();
+
+        shutdownCluster();
+
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+
+        TestUtils.assertAllJraftThreadsStopped();
+    }
+
+    protected void shutdownCluster() throws Exception {
+        LOG.info("Start client shutdown");
+
+        Iterator<RaftGroupService> iterClients = clients.iterator();
+
+        while (iterClients.hasNext()) {
+            RaftGroupService client = iterClients.next();
+
+            iterClients.remove();
+
+            client.shutdown();
+        }
+
+        clients.clear();
+
+        LOG.info("Start server shutdown servers={}", servers.size());
+
+        Iterator<JraftServerImpl> iterSrv = servers.iterator();
+
+        while (iterSrv.hasNext()) {
+            JraftServerImpl server = iterSrv.next();
+
+            iterSrv.remove();
+
+            Set<ReplicationGroupId> grps = server.startedGroups();
+
+            for (ReplicationGroupId grp : grps) {
+                server.stopRaftGroup(grp);
+            }
+
+            server.beforeNodeStop();
+
+            server.stop();
+        }
+
+        servers.clear();
+    }
+
+    /**
+     * Starts server.
+     *
+     * @param idx  The index.
+     * @param clo  Init closure.
+     * @param optionsUpdater Node options updater.
+     * @return Raft server instance.
+     */
+    protected JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> optionsUpdater) {
+        var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+        ClusterService service = clusterService(PORT + idx, List.of(addr), true);
+
+        NodeOptions opts = new NodeOptions();
+
+        optionsUpdater.accept(opts);
+
+        JraftServerImpl server = new JraftServerImpl(service, dataPath.resolve("node" + idx), opts) {
+            @Override
+            public void stop() throws Exception {
+                servers.remove(this);
+
+                super.stop();
+
+                service.stop();
+            }
+        };
+
+        server.start();
+
+        clo.accept(server);
+
+        servers.add(server);
+
+        assertTrue(waitForTopology(service, servers.size(), 15_000));
+
+        return server;
+    }
+
+    /**
+     * Starts client.
+     *
+     * @param groupId Group id.
+     * @return The client.
+     * @throws Exception If failed.
+     */
+    protected RaftGroupService startClient(ReplicationGroupId groupId) throws Exception {
+        var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+        ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true);
+
+        RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
+                List.of(new Peer(addr)), false, 200, executor).get(3, TimeUnit.SECONDS);
+
+        clients.add(client);
+
+        return client;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 83238dd978..c73c980037 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -56,6 +58,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -105,16 +108,37 @@ public class Loza implements IgniteComponent {
      * The constructor.
      *
      * @param clusterNetSvc Cluster network service.
+     * @param raftConfiguration Raft configuration.
      * @param dataPath      Data path.
      * @param clock         A hybrid logical clock.
      */
     public Loza(ClusterService clusterNetSvc, RaftConfiguration raftConfiguration, Path dataPath, HybridClock clock) {
+        this(clusterNetSvc, raftConfiguration, dataPath, clock, null);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param clusterNetSvc     Cluster network service.
+     * @param raftConfiguration Raft configuration.
+     * @param dataPath          Data path.
+     * @param clock             A hybrid logical clock.
+     * @param safeTimeTracker   Safe time tracker.
+     */
+    public Loza(
+            ClusterService clusterNetSvc,
+            RaftConfiguration raftConfiguration,
+            Path dataPath,
+            HybridClock clock,
+            @Nullable PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker
+    ) {
         this.clusterNetSvc = clusterNetSvc;
         this.raftConfiguration = raftConfiguration;
 
         NodeOptions options = new NodeOptions();
 
         options.setClock(clock);
+        options.setSafeTimeTracker(safeTimeTracker);
 
         this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options);
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index d074ae35e8..f9b4154ffe 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -37,6 +37,9 @@ public class RaftGroupOptions {
     /** Raft meta storage factory. */
     private RaftMetaStorageFactory raftMetaStorageFactory;
 
+    /** Options that are specific for replication group. */
+    private ReplicationGroupOptions replicationGroupOptions;
+
     /**
      * Returns default options as defined by classic Raft (so stores are persistent).
      *
@@ -126,4 +129,20 @@ public class RaftGroupOptions {
 
         return this;
     }
+
+    /**
+     * Replication group options.
+     */
+    public ReplicationGroupOptions replicationGroupOptions() {
+        return replicationGroupOptions;
+    }
+
+    /**
+     * Set the replication group options.
+     */
+    public RaftGroupOptions replicationGroupOptions(ReplicationGroupOptions replicationGroupOptions) {
+        this.replicationGroupOptions = replicationGroupOptions;
+
+        return this;
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java
new file mode 100644
index 0000000000..6301ce8711
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/ReplicationGroupOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Options that are specific for replication group.
+ */
+public class ReplicationGroupOptions {
+    /** Safe time. */
+    private PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Safe time.
+     */
+    public PendingComparableValuesTracker<HybridTimestamp> safeTime() {
+        return safeTime;
+    }
+
+    /**
+     * Set the safe time clock.
+     *
+     * @param safeTime Safe time.
+     * @return This, for chaining.
+     */
+    public ReplicationGroupOptions safeTime(PendingComparableValuesTracker<HybridTimestamp> safeTime) {
+        this.safeTime = safeTime;
+
+        return this;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index f685423809..2cbd7ceaa5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -413,6 +413,10 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setRpcClient(client);
 
+            if (groupOptions.replicationGroupOptions() != null) {
+                nodeOptions.setSafeTimeTracker(groupOptions.replicationGroupOptions().safeTime());
+            }
+
             NetworkAddress addr = service.topologyService().localMember().address();
 
             var peerId = new PeerId(addr.host(), addr.port(), 0, ElectionPriority.DISABLED);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index b95f4ca175..002c7d3caa 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -58,6 +58,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.Utils;
 
 /**
@@ -155,6 +156,7 @@ public class FSMCallerImpl implements FSMCaller {
     private NodeMetrics nodeMetrics;
     private final CopyOnWriteArrayList<LastAppliedLogIndexListener> lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
     private RaftMessagesFactory msgFactory;
+    private SafeTimeCandidateManager safeTimeCandidateManager;
 
     public FSMCallerImpl() {
         super();
@@ -186,6 +188,7 @@ public class FSMCallerImpl implements FSMCaller {
         }
         this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
         this.msgFactory = opts.getRaftMessagesFactory();
+        this.safeTimeCandidateManager = opts.getSafeTimeCandidateManager();
         LOG.info("Starts FSMCaller successfully.");
         return true;
     }
@@ -524,6 +527,9 @@ public class FSMCallerImpl implements FSMCaller {
             this.lastAppliedTerm = lastTerm;
             this.logManager.setAppliedId(lastAppliedId);
             notifyLastAppliedIndexUpdated(lastIndex);
+            if (safeTimeCandidateManager != null) {
+                safeTimeCandidateManager.commitIndex(lastAppliedIndex, lastIndex, lastTerm);
+            }
         }
         finally {
             this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 064546cb0f..3d63f790db 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -116,6 +116,7 @@ import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.RepeatedTimer;
 import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
 import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -137,6 +138,8 @@ public class NodeImpl implements Node, RaftServerService {
 
     private volatile HybridClock clock;
 
+    private volatile SafeTimeCandidateManager safeTimeCandidateManager;
+
     /**
      * Internal states
      */
@@ -822,6 +825,7 @@ public class NodeImpl implements Node, RaftServerService {
         opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
         opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
         opts.setGroupId(groupId);
+        opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
 
         return this.fsmCaller.init(opts);
     }
@@ -857,6 +861,9 @@ public class NodeImpl implements Node, RaftServerService {
         Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
         this.serviceFactory = opts.getServiceFactory();
         this.clock = opts.getNodeOptions().getClock();
+        if (opts.getNodeOptions().getSafeTimeTracker() != null) {
+            this.safeTimeCandidateManager = new SafeTimeCandidateManager(opts.getNodeOptions().getSafeTimeTracker());
+        }
         // Term is not an option since changing it is very dangerous
         final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0;
         final LogId bootstrapId = new LogId(opts.getLastLogIndex(), bootstrapLogTerm);
@@ -955,6 +962,9 @@ public class NodeImpl implements Node, RaftServerService {
         Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
         this.serviceFactory = opts.getServiceFactory();
         this.clock = opts.getClock();
+        if (opts.getSafeTimeTracker() != null) {
+            this.safeTimeCandidateManager = new SafeTimeCandidateManager(opts.getSafeTimeTracker());
+        }
         this.options = opts;
         this.raftOptions = opts.getRaftOptions();
         this.metrics = new NodeMetrics(opts.isEnableMetrics());
@@ -2217,6 +2227,10 @@ public class NodeImpl implements Node, RaftServerService {
                     }
                     entries.add(logEntry);
                 }
+
+                if (safeTimeCandidateManager != null) {
+                    safeTimeCandidateManager.addSafeTimeCandidate(index, request.term(), request.timestamp());
+                }
             }
 
             final FollowerStableClosure closure = new FollowerStableClosure(
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index e029fd25ca..7d2a7402c3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -671,9 +671,7 @@ public class Replicator implements ThreadId.OnError {
     private void sendEmptyEntries(final boolean isHeartbeat,
         final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
         final AppendEntriesRequestBuilder rb = raftOptions.getRaftMessagesFactory().appendEntriesRequest();
-        if (isHeartbeat) {
-            rb.timestamp(options.getNode().clockNow());
-        }
+        rb.timestamp(options.getNode().clockNow());
         if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
             // id is unlock in installSnapshot
             installSnapshot();
@@ -1589,6 +1587,8 @@ public class Replicator implements ThreadId.OnError {
             RecycleUtil.recycle(byteBufList);
         }
 
+        rb.timestamp(this.options.getNode().clockNow());
+
         final AppendEntriesRequest request = rb.build();
         if (LOG.isDebugEnabled()) {
             LOG.debug(
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 53a78d0c95..7f37460c75 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -25,6 +25,7 @@ import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.storage.LogManager;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 
 /**
  * FSM caller options.
@@ -41,6 +42,7 @@ public class FSMCallerOptions {
     private NodeImpl node;
     private RaftMessagesFactory raftMessagesFactory;
     private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+    private SafeTimeCandidateManager safeTimeCandidateManager;
 
     public String getGroupId() {
         return groupId;
@@ -115,4 +117,12 @@ public class FSMCallerOptions {
     public void setRaftMessagesFactory(RaftMessagesFactory raftMessagesFactory) {
         this.raftMessagesFactory = raftMessagesFactory;
     }
+
+    public SafeTimeCandidateManager getSafeTimeCandidateManager() {
+        return safeTimeCandidateManager;
+    }
+
+    public void setSafeTimeCandidateManager(SafeTimeCandidateManager safeTimeCandidateManager) {
+        this.safeTimeCandidateManager = safeTimeCandidateManager;
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 158efeeb95..565f8eab75 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -18,8 +18,11 @@ package org.apache.ignite.raft.jraft.option;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -234,7 +237,10 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
 
     /** A hybrid clock */
-    private HybridClock clock = new HybridClock();
+    private HybridClock clock = new HybridClockImpl();
+
+    /** A container for safe time. */
+    private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
 
     /**
      * Amount of Disruptors that will handle the RAFT server.
@@ -597,6 +603,14 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         this.clock = clock;
     }
 
+    public PendingComparableValuesTracker<HybridTimestamp> getSafeTimeTracker() {
+        return safeTimeTracker;
+    }
+
+    public void setSafeTimeTracker(PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker) {
+        this.safeTimeTracker = safeTimeTracker;
+    }
+
     @Override
     public NodeOptions copy() {
         final NodeOptions nodeOptions = new NodeOptions();
@@ -633,6 +647,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
         nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
         nodeOptions.setClock(this.getClock());
+        nodeOptions.setSafeTimeTracker(this.getSafeTimeTracker());
 
         return nodeOptions;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
new file mode 100644
index 0000000000..01c2719339
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidateManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * This manager stores safe time candidates coming with appendEntries requests, and applies them after committing corresponding
+ * indexes. Only candidates with correct term can be applied, other ones are discarded.
+ */
+public class SafeTimeCandidateManager {
+    /** Safe time clock. */
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
+
+    /** Candidates map. */
+    private final Map<Long, Map<Long, HybridTimestamp>> safeTimeCandidates = new ConcurrentHashMap<>();
+
+    public SafeTimeCandidateManager(PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker) {
+        this.safeTimeTracker = safeTimeTracker;
+    }
+
+    /**
+     * Add safe time candidate.
+     *
+     * @param index Corresponding log index.
+     * @param term Corresponding term.
+     * @param safeTime Safe time candidate.
+     */
+    public void addSafeTimeCandidate(long index, long term, HybridTimestamp safeTime) {
+        safeTimeCandidates.compute(index, (i, candidates) -> {
+            if (candidates == null) {
+                candidates = new HashMap<>();
+            }
+
+            candidates.put(term, safeTime);
+
+            return candidates;
+        });
+    }
+
+    /**
+     * Called on index commit, applies safe time for corresponding index.
+     *
+     * @param prevIndex Previous applied index.
+     * @param index Index.
+     * @param term Term.
+     */
+    public void commitIndex(long prevIndex, long index, long term) {
+        long currentIndex = prevIndex + 1;
+
+        while (currentIndex <= index) {
+            Map<Long, HybridTimestamp> candidates = safeTimeCandidates.remove(currentIndex);
+
+            if (candidates != null) {
+                HybridTimestamp safeTime = null;
+
+                for (Map.Entry<Long, HybridTimestamp> e : candidates.entrySet()) {
+                    if (e.getKey() == term) {
+                        safeTime = e.getValue();
+                    }
+                }
+
+                assert safeTime != null;
+
+                safeTimeTracker.update(safeTime);
+            }
+
+            currentIndex++;
+        }
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 6cc9758794..4671588ef4 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.function.Supplier;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -65,7 +65,7 @@ public class LozaTest extends IgniteAbstractTest {
         Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
         Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
 
-        Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClock());
+        Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClockImpl());
 
         loza.start();
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index dc15eadadd..de8f7666ae 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -18,6 +18,8 @@ package org.apache.ignite.raft.jraft.core;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.raft.jraft.Iterator;
 import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -42,6 +44,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.SafeTimeCandidateManager;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -73,6 +76,10 @@ public class FSMCallerTest {
 
     private ExecutorService executor;
 
+    private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+
+    private SafeTimeCandidateManager safeTimeCandidateManager = new SafeTimeCandidateManager(safeTimeTracker);
+
     @BeforeEach
     public void setup() {
         this.fsmCaller = new FSMCallerImpl();
@@ -94,6 +101,7 @@ public class FSMCallerTest {
             1024,
             () -> new FSMCallerImpl.ApplyTask(),
             1));
+        opts.setSafeTimeCandidateManager(safeTimeCandidateManager);
         assertTrue(this.fsmCaller.init(opts));
     }
 
@@ -131,6 +139,7 @@ public class FSMCallerTest {
 
     @Test
     public void testOnCommitted() throws Exception {
+        safeTimeCandidateManager.addSafeTimeCandidate(11, 1, new HybridTimestamp(1, 1));
         final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
         log.getId().setIndex(11);
         log.getId().setTerm(1);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
new file mode 100644
index 0000000000..4f07642fbb
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/SafeTimeCandidatesManagerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test class for {@link SafeTimeCandidateManager}.
+ */
+public class SafeTimeCandidatesManagerTest {
+    @Test
+    public void test() {
+        PendingComparableValuesTracker<HybridTimestamp> safeTimeClock = new PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
+
+        SafeTimeCandidateManager safeTimeCandidateManager = new SafeTimeCandidateManager(safeTimeClock);
+
+        safeTimeCandidateManager.addSafeTimeCandidate(1, 1, new HybridTimestamp(1, 1));
+
+        safeTimeCandidateManager.commitIndex(0, 1, 1);
+        assertEquals(new HybridTimestamp(1, 1), safeTimeClock.current());
+
+        safeTimeCandidateManager.addSafeTimeCandidate(2, 1, new HybridTimestamp(10, 1));
+        safeTimeCandidateManager.addSafeTimeCandidate(2, 2, new HybridTimestamp(100, 1));
+        safeTimeCandidateManager.addSafeTimeCandidate(3, 3, new HybridTimestamp(1000, 1));
+
+        safeTimeCandidateManager.commitIndex(1, 2, 2);
+        assertEquals(new HybridTimestamp(100, 1), safeTimeClock.current());
+
+        safeTimeCandidateManager.commitIndex(2, 3, 3);
+        assertEquals(new HybridTimestamp(1000, 1), safeTimeClock.current());
+    }
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 447cec4c81..a4381bc402 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -59,6 +59,14 @@ public class Replica {
                 replicaGrpId);
 
         return listener.invoke(request);
+    }
 
+    /**
+     * Replica group identity, this id is the same as the considered partition's id.
+     *
+     * @return Group id.
+     */
+    public ReplicationGroupId groupId() {
+        return replicaGrpId;
     }
 }
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index a15fd2b7cc..6d95d5d136 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.replicator;
 
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -33,7 +38,9 @@ import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -52,6 +59,9 @@ import org.jetbrains.annotations.TestOnly;
  * This class allow to start/stop/get a replica.
  */
 public class ReplicaManager implements IgniteComponent {
+    /** Idle safe time propagation period. */
+    private static final int IDLE_SAFE_TIME_PROPAGATION_PERIOD_SECONDS = 1;
+
     /** The logger. */
     private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
 
@@ -76,6 +86,10 @@ public class ReplicaManager implements IgniteComponent {
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    /** Scheduled executor for idle safe time sync. */
+    private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor =
+            Executors.newScheduledThreadPool(1, new NamedThreadFactory("scheduled-idle-safe-time-sync-thread", LOG));
+
     /** Set of message groups to handler as replica requests. */
     Set<Class<?>> messageGroupsToHandle;
 
@@ -168,7 +182,8 @@ public class ReplicaManager implements IgniteComponent {
      */
     public Replica startReplica(
             ReplicationGroupId replicaGrpId,
-            ReplicaListener listener) throws NodeStoppingException {
+            ReplicaListener listener
+    ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
@@ -187,7 +202,10 @@ public class ReplicaManager implements IgniteComponent {
      * @param listener Replica listener.
      * @return New replica.
      */
-    private Replica startReplicaInternal(ReplicationGroupId replicaGrpId, ReplicaListener listener) {
+    private Replica startReplicaInternal(
+            ReplicationGroupId replicaGrpId,
+            ReplicaListener listener
+    ) {
         var replica = new Replica(replicaGrpId, listener);
 
         Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
@@ -233,6 +251,12 @@ public class ReplicaManager implements IgniteComponent {
     public void start() {
         clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler);
         messageGroupsToHandle.forEach(mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler));
+        scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(
+                this::idleSafeTimeSync,
+                0,
+                IDLE_SAFE_TIME_PROPAGATION_PERIOD_SECONDS,
+                TimeUnit.SECONDS
+        );
     }
 
     /** {@inheritDoc} */
@@ -244,6 +268,8 @@ public class ReplicaManager implements IgniteComponent {
 
         busyLock.block();
 
+        shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 10, TimeUnit.SECONDS);
+
         assert replicas.isEmpty() : "There are replicas alive [replicas=" + replicas.keySet() + ']';
     }
 
@@ -341,6 +367,19 @@ public class ReplicaManager implements IgniteComponent {
         }
     }
 
+    /**
+     * Idle safe time sync for replicas.
+     */
+    private void idleSafeTimeSync() {
+        replicas.values().forEach(r -> {
+            ReplicaSafeTimeSyncRequest req = REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                    .groupId(r.groupId())
+                    .build();
+
+            r.processRequest(req);
+        });
+    }
+
     /**
      * Returns started replication groups.
      *
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
new file mode 100644
index 0000000000..9b7d3389a2
--- /dev/null
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.command;
+
+import org.apache.ignite.raft.client.WriteCommand;
+
+/**
+ * Write command to synchronize safe time periodically.
+ */
+public class SafeTimeSyncCommand implements WriteCommand {
+}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
index 37485df750..abac579576 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
@@ -38,4 +38,7 @@ public class ReplicaMessageGroup {
 
     /** Message type for {@link TimestampAwareReplicaResponse}. */
     public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
+
+    /** Message type for {@link ReplicaSafeTimeSyncRequest}. */
+    public static final short SAFE_TIME_SYNC_REQUEST = 6;
 }
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
similarity index 51%
copy from modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
copy to modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
index 37485df750..0cd612f9f3 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
@@ -17,25 +17,11 @@
 
 package org.apache.ignite.internal.replicator.message;
 
-import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Message group for the replication process.
+ * Request that initiates safe time synchronization.
  */
-@MessageGroup(groupType = 8, groupName = "ReplicaMessages")
-public class ReplicaMessageGroup {
-    /** Message type for {@link ErrorReplicaResponse}. */
-    public static final short ERROR_REPLICA_RESPONSE = 1;
-
-    /** Message type for {@link ReplicaResponse}. */
-    public static final short REPLICA_RESPONSE = 2;
-
-    /** Message type for {@link TimestampAware}. */
-    public static final short TIMESTAMP_AWARE = 3;
-
-    /** Message type for {@link ErrorTimestampAwareReplicaResponse}. */
-    public static final short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4;
-
-    /** Message type for {@link TimestampAwareReplicaResponse}. */
-    public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5;
+@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_REQUEST)
+public interface ReplicaSafeTimeSyncRequest extends ReplicaRequest {
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 822a1604a0..0cf4b76c0e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.configuration.storage.ConfigurationStorageList
 import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -118,7 +118,7 @@ public class ItDistributedConfigurationPropertiesTest {
                     new StaticNodeFinder(memberAddrs)
             );
 
-            raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+            raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index ce14c25d27..138c9ea6c0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -91,7 +91,7 @@ public class ItDistributedConfigurationStorageTest {
                     new StaticNodeFinder(List.of(addr))
             );
 
-            raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClock());
+            raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 7c6a1c1190..b25301643b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -431,15 +432,15 @@ public class ItRebalanceDistributedTest {
 
             lockManager = new HeapLockManager();
 
-            raftManager = new Loza(clusterService, raftConfiguration, dir, new HybridClock());
+            raftManager = new Loza(clusterService, raftConfiguration, dir, new HybridClockImpl());
 
             replicaManager = new ReplicaManager(
                     clusterService,
-                    new HybridClock(),
+                    new HybridClockImpl(),
                     Set.of(TableMessageGroup.class, TxMessageGroup.class)
             );
 
-            HybridClock hybridClock = new HybridClock();
+            HybridClock hybridClock = new HybridClockImpl();
 
             ReplicaService replicaSvc = new ReplicaService(
                     clusterService.messagingService(),
@@ -526,7 +527,7 @@ public class ItRebalanceDistributedTest {
                     metaStorageManager,
                     schemaManager,
                     view -> new LocalLogStorageFactory(),
-                    new HybridClock()
+                    new HybridClockImpl()
             );
         }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 25c353226f..baaaa2e53d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorag
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -228,7 +229,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 nettyBootstrapFactory
         );
 
-        HybridClock hybridClock = new HybridClock();
+        HybridClock hybridClock = new HybridClockImpl();
 
         var raftMgr = new Loza(clusterSvc, raftConfiguration, dir, hybridClock);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 069cd4713a..8aa8bab7cb 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -393,6 +393,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      *
      * @throws Exception If failed.
      */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18003")
     @Test
     public void testCreateDropTable() throws Exception {
         clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME)));
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6e4d0e429d..c9a6dc6312 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -281,7 +282,7 @@ public class IgniteImpl implements Ignite {
                 nodeCfgMgr.configurationRegistry().getConfiguration(ComputeConfiguration.KEY)
         );
 
-        clock = new HybridClock();
+        clock = new HybridClockImpl();
 
         raftMgr = new Loza(
                 clusterSvc,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 3f86aae8af..25beeb5419 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -43,7 +43,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.index.event.IndexEvent;
@@ -390,7 +390,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                 })
                 .thenCompose(sqlNode -> {
                     final boolean rwOp = dataModificationOp(sqlNode);
-                    final HybridTimestamp txTime = outerTx != null ? outerTx.readTimestamp() : rwOp ? null : new HybridClock().now();
+                    final HybridTimestamp txTime = outerTx != null ? outerTx.readTimestamp() : rwOp ? null : new HybridClockImpl().now();
 
                     BaseQueryContext ctx = BaseQueryContext.builder()
                             .frameworkConfig(
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 66bef99e1e..adc742225d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.configuration.notifications.ConfigurationStora
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.configuration.testframework.InjectRevisionListenerHolder;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.raft.Loza;
@@ -769,7 +770,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
                 msm,
                 schemaManager,
                 view -> new LocalLogStorageFactory(),
-                null
+                new HybridClockImpl()
         );
 
         tableManager.start();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
index 47d70d6953..1e86f9b66f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanExecutionTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -148,7 +149,7 @@ public class TableScanExecutionTest extends AbstractExecutionTest {
                     PART_CNT,
                     NetworkAddress::toString,
                     addr -> Mockito.mock(ClusterNode.class),
-                    new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClock()),
+                    new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()),
                     mock(MvTableStorage.class),
                     mock(TxStateTableStorage.class),
                     replicaSvc,
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 599265679f..d55d99fb8f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -42,6 +42,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.tx.Timestamp;
@@ -65,7 +66,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
     protected final UUID txId = newTransactionId();
 
     /** Hybrid clock to generate timestamps. */
-    protected final HybridClock clock = new HybridClock();
+    protected final HybridClock clock = new HybridClockImpl();
 
     protected final TestKey key = new TestKey(10, "foo");
     private final TestValue value = new TestValue(20, "bar");
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 155a7e8b9d..2a4477354a 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -84,7 +85,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
     /** Internal table to test. */
     protected InternalTable internalTbl;
 
-    private final HybridClock clock = new HybridClock();
+    private final HybridClock clock = new HybridClockImpl();
 
     /**
      * Prepare test environment using DummyInternalTableImpl and Mocked storage.
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 9e201b9958..a16ab166a9 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -71,7 +72,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final HybridClock CLOCK = new HybridClock();
+    private static final HybridClock CLOCK = new HybridClockImpl();
 
     private static final Row ROW_1 = createKeyValueRow(1, 1001);
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 80871da334..15433e7254 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -34,7 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(MockitoExtension.class)
 public class ItInternalTableReadOnlyScanTest extends ItAbstractInternalTableScanTest {
-    private static final HybridClock CLOCK = new HybridClock();
+    private static final HybridClock CLOCK = new HybridClockImpl();
 
     /** {@inheritDoc} */
     @Override
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 2e7725c550..a345ee2b50 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -106,7 +107,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
     @Override
     public void beforeFollowerStop(RaftGroupService service) throws Exception {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+        TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
 
         managers.add(txManager);
 
@@ -133,7 +134,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
     @Override
     public void afterFollowerStop(RaftGroupService service) throws Exception {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+        TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
 
         managers.add(txManager);
 
@@ -166,7 +167,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
     @Override
     public void afterSnapshot(RaftGroupService service) throws Exception {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica layer with new transaction protocol.
-        TxManager txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+        TxManager txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
 
         managers.add(txManager);
 
@@ -228,7 +229,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
                 .map(Map.Entry::getKey)
                 .findAny()
                 .orElseGet(() -> {
-                    TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClock());
+                    TxManagerImpl txManager = new TxManagerImpl(null, new HeapLockManager(), new HybridClockImpl());
 
                     txManager.start(); // Init listener.
 
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 13e0739c89..00a8c8da2d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -43,6 +43,8 @@ import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Loza;
@@ -83,6 +85,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -226,7 +229,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
 
             assertTrue(waitForTopology(client, nodes + 1, 1000));
 
-            clientClock = new HybridClock();
+            clientClock = new HybridClockImpl();
 
             log.info("Replica manager has been started, node=[" + client.topologyService().localMember() + ']');
 
@@ -252,11 +255,17 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
         for (int i = 0; i < nodes; i++) {
             ClusterNode node = cluster.get(i).topologyService().localMember();
 
-            HybridClock clock = new HybridClock();
+            HybridClock clock = new HybridClockImpl();
 
             clocks.put(node, clock);
 
-            var raftSrv = new Loza(cluster.get(i), raftConfiguration, workDir.resolve("node" + i), clock);
+            var raftSrv = new Loza(
+                    cluster.get(i),
+                    raftConfiguration,
+                    workDir.resolve("node" + i),
+                    clock,
+                    new PendingComparableValuesTracker<>(clock.now())
+            );
 
             raftSrv.start();
 
@@ -367,6 +376,16 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                 null
         );
 
+        Map<ClusterNode, Function<Peer, Boolean>> isLocalPeerCheckerList = cluster.stream().collect(Collectors.toMap(
+                node -> node.topologyService().localMember(),
+                node -> {
+                        TopologyService ts = node.topologyService();
+
+                        Function<Peer, Boolean> f = peer -> ts.getByAddress(peer.address()).equals(ts.localMember());
+
+                        return f;
+                }
+        ));
 
         Int2ObjectOpenHashMap<RaftGroupService> clients = new Int2ObjectOpenHashMap<>();
 
@@ -425,6 +444,9 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                 ).thenAccept(
                         raftSvc -> {
                             try {
+                                PendingComparableValuesTracker<HybridTimestamp> safeTime =
+                                        new PendingComparableValuesTracker<>(clocks.get(node).now());
+
                                 replicaManagers.get(node).startReplica(
                                         new TablePartitionId(tblId, partId),
                                         new PartitionReplicaListener(
@@ -437,10 +459,13 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                                                 () -> Map.of(pkLocker.id(), pkLocker),
                                                 pkStorage,
                                                 clocks.get(node),
+                                                safeTime,
                                                 txSateStorage,
                                                 topologyServices.get(node),
-                                                placementDriver
-                                        ));
+                                                placementDriver,
+                                                isLocalPeerCheckerList.get(node)
+                                        )
+                                );
                             } catch (NodeStoppingException e) {
                                 fail("Unexpected node stopping", e);
                             }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 5b0cc154f3..f3a4d601cf 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -128,7 +129,7 @@ public class ItColocationTest {
 
         ReplicaService replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
 
-        TxManager txManager = new TxManagerImpl(replicaService,  new HeapLockManager(), new HybridClock()) {
+        TxManager txManager = new TxManagerImpl(replicaService,  new HeapLockManager(), new HybridClockImpl()) {
             @Override
             public CompletableFuture<Void> finish(
                     ReplicationGroupId commitPartition,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 995dd33695..97d7ba0d02 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.causality.VersionedValue;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.EventListener;
@@ -90,6 +91,7 @@ import org.apache.ignite.internal.metastorage.client.WatchListener;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.ReplicationGroupOptions;
 import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -135,6 +137,7 @@ import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteNameUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.RebalanceUtil;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -690,6 +693,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
 
+                PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
                 if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
                     startGroupFut = CompletableFuture
                             .supplyAsync(() -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
@@ -731,8 +736,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                         return CompletableFuture.completedFuture(null);
                                     }
 
-                                    RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
-                                            newPartAssignment);
+                                    RaftGroupOptions groupOptions = groupOptionsForPartition(
+                                            internalTbl,
+                                            tblCfg,
+                                            partitionStorage,
+                                            newPartAssignment,
+                                            safeTime
+                                    );
 
                                     try {
                                         raftMgr.startRaftGroupNode(
@@ -794,9 +804,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                                             new Lazy<>(() -> table.indexStorageAdapters(partId)
                                                                     .get().get(table.pkId())),
                                                             clock,
+                                                            safeTime,
                                                             internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
                                                             topologyService,
-                                                            placementDriver
+                                                            placementDriver,
+                                                            peer -> clusterNodeResolver.apply(peer.address())
+                                                                    .equals(topologyService.localMember())
                                                     )
                                             );
                                         } catch (NodeStoppingException ex) {
@@ -847,7 +860,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             InternalTable internalTbl,
             ExtendedTableConfiguration tableConfig,
             MvPartitionStorage partitionStorage,
-            Set<ClusterNode> peers
+            Set<ClusterNode> peers,
+            PendingComparableValuesTracker<HybridTimestamp> safeTime
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -869,6 +883,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 List.of()
         ));
 
+        raftGroupOptions.replicationGroupOptions(new ReplicationGroupOptions().safeTime(safeTime));
+
         return raftGroupOptions;
     }
 
@@ -1721,6 +1737,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                             .filter(p -> !assignments.contains(p))
                             .collect(Collectors.toList());
 
+                    PendingComparableValuesTracker<HybridTimestamp> safeTime = new PendingComparableValuesTracker<>(clock.now());
+
                     try {
                         LOG.info("Received update on pending assignments. Check if new raft group should be started"
                                         + " [key={}, partition={}, table={}, localMemberAddress={}]",
@@ -1733,7 +1751,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     tbl.internalTable(),
                                     tblCfg,
                                     partitionStorage,
-                                    assignments
+                                    assignments,
+                                    safeTime
                             );
 
                             RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -1777,9 +1796,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                             tbl.indexesLockers(partId),
                                             new Lazy<>(() -> tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
                                             clock,
+                                            safeTime,
                                             tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
                                             raftMgr.topologyService(),
-                                            placementDriver
+                                            placementDriver,
+                                            peer -> clusterNodeResolver.apply(peer.address())
+                                                    .equals(raftMgr.topologyService().localMember())
                                     )
                             );
                         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9b85b1ff39..e00f8041a8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -35,6 +35,7 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
@@ -129,6 +130,8 @@ public class PartitionListener implements RaftGroupListener {
                     handleFinishTxCommand((FinishTxCommand) command, commandIndex);
                 } else if (command instanceof TxCleanupCommand) {
                     handleTxCleanupCommand((TxCleanupCommand) command, commandIndex);
+                } else if (command instanceof SafeTimeSyncCommand) {
+                    handleSafeTimeSyncCommand((SafeTimeSyncCommand) command);
                 } else {
                     assert false : "Command was not found [cmd=" + command + ']';
                 }
@@ -271,6 +274,15 @@ public class PartitionListener implements RaftGroupListener {
         });
     }
 
+    /**
+     * Handler for the {@link SafeTimeSyncCommand}.
+     *
+     * @param cmd Command.
+     */
+    private void handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd) {
+        // No-op.
+    }
+
     /** {@inheritDoc} */
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 018a302666..3f974a9781 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
@@ -29,23 +31,27 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
@@ -58,6 +64,7 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
@@ -80,6 +87,7 @@ import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -87,6 +95,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -135,6 +144,9 @@ public class PartitionReplicaListener implements ReplicaListener {
     /** Hybrid clock. */
     private final HybridClock hybridClock;
 
+    /** Safe time. */
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
     /** Placement Driver. */
     private final PlacementDriver placementDriver;
 
@@ -146,6 +158,11 @@ public class PartitionReplicaListener implements ReplicaListener {
 
     private final Supplier<Map<UUID, IndexLocker>> indexesLockers;
 
+    /**
+     * Function for checking that the given peer is local.
+     */
+    private final Function<Peer, Boolean> isLocalPeerChecker;
+
     /**
      * The constructor.
      *
@@ -156,9 +173,11 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param partId Partition id.
      * @param tableId Table id.
      * @param hybridClock Hybrid clock.
+     * @param safeTime Safe time clock.
      * @param txStateStorage Transaction state storage.
      * @param topologyService Topology services.
      * @param placementDriver Placement driver.
+     * @param isLocalPeerChecker Function for checking that the given peer is local.
      */
     public PartitionReplicaListener(
             MvPartitionStorage mvDataStorage,
@@ -170,9 +189,11 @@ public class PartitionReplicaListener implements ReplicaListener {
             Supplier<Map<UUID, IndexLocker>> indexesLockers,
             Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
             HybridClock hybridClock,
+            PendingComparableValuesTracker<HybridTimestamp> safeTime,
             TxStateStorage txStateStorage,
             TopologyService topologyService,
-            PlacementDriver placementDriver
+            PlacementDriver placementDriver,
+            Function<Peer, Boolean> isLocalPeerChecker
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftClient = raftClient;
@@ -183,9 +204,11 @@ public class PartitionReplicaListener implements ReplicaListener {
         this.indexesLockers = indexesLockers;
         this.pkIndexStorage = pkIndexStorage;
         this.hybridClock = hybridClock;
+        this.safeTime = safeTime;
         this.txStateStorage = txStateStorage;
         this.topologyService = topologyService;
         this.placementDriver = placementDriver;
+        this.isLocalPeerChecker = isLocalPeerChecker;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -212,7 +235,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         }
 
         return ensureReplicaIsPrimary(request)
-                .thenCompose((ignore) -> {
+                .thenCompose((isPrimary) -> {
                     if (request instanceof ReadWriteSingleRowReplicaRequest) {
                         return processSingleEntryAction((ReadWriteSingleRowReplicaRequest) request);
                     } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
@@ -224,17 +247,19 @@ public class PartitionReplicaListener implements ReplicaListener {
                     } else if (request instanceof ReadWriteScanCloseReplicaRequest) {
                         processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
 
-                        return CompletableFuture.completedFuture(null);
+                        return completedFuture(null);
                     } else if (request instanceof TxFinishReplicaRequest) {
                         return processTxFinishAction((TxFinishReplicaRequest) request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return processTxCleanupAction((TxCleanupReplicaRequest) request);
                     } else if (request instanceof ReadOnlySingleRowReplicaRequest) {
-                        return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                        return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary);
                     } else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
-                        return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                        return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary);
                     } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
-                        return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request);
+                        return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
+                    } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+                        return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
                     } else {
                         throw new UnsupportedReplicaRequestException(request.getClass());
                     }
@@ -258,7 +283,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
                                 return txStateFut.thenApply(txMeta -> new IgniteBiTuple<>(txMeta, null));
                             } else {
-                                return CompletableFuture.completedFuture(
+                                return completedFuture(
                                         new IgniteBiTuple<>(null, topologyService.getByAddress(leaderAddress)));
                             }
                         }
@@ -299,9 +324,15 @@ public class PartitionReplicaListener implements ReplicaListener {
      * Processes retrieve batch for read only transaction.
      *
      * @param request Read only retrieve batch request.
+     * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
+    private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(
+            ReadOnlyScanRetrieveBatchReplicaRequest request,
+            Boolean isPrimary
+    ) {
+        requireNonNull(isPrimary);
+
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
         HybridTimestamp readTimestamp = request.readTimestamp();
@@ -313,24 +344,30 @@ public class PartitionReplicaListener implements ReplicaListener {
         @SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
                 id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
 
-        while (batchRows.size() < batchCount && cursor.hasNext()) {
-            BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+        CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
 
-            if (resolvedReadResult != null) {
-                batchRows.add(resolvedReadResult);
+        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+        return safeReadFuture.thenApplyAsync(ignored -> {
+            while (batchRows.size() < batchCount && cursor.hasNext()) {
+                BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+
+                if (resolvedReadResult != null) {
+                    batchRows.add(resolvedReadResult);
+                }
             }
-        }
 
-        return CompletableFuture.completedFuture(batchRows);
+            return batchRows;
+        });
     }
 
     /**
      * Processes single entry request for read only transaction.
      *
      * @param request Read only single entry request.
+     * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+    private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
         BinaryRow tableRow = request.binaryRow();
         HybridTimestamp readTimestamp = request.readTimestamp();
 
@@ -339,32 +376,52 @@ public class PartitionReplicaListener implements ReplicaListener {
                     format("Unknown single request [actionType={}]", request.requestType()));
         }
 
-        //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+        CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> CompletableFuture.completedFuture(binaryRow));
+        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+        return safeReadFuture.thenApplyAsync(ignored -> {
+            //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
+            return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> binaryRow);
+        });
     }
 
     /**
      * Processes multiple entries request for read only transaction.
      *
      * @param request Read only multiple entries request.
+     * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+    private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request, Boolean isPrimary) {
         if (request.requestType() != RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", request.requestType()));
         }
 
-        ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
+        CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        for (BinaryRow searchRow : request.binaryRows()) {
-            BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
+        return safeReadFuture.thenApplyAsync(ignored -> {
+            ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
 
-            result.add(row);
-        }
+            for (BinaryRow searchRow : request.binaryRows()) {
+                BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+
+                result.add(row);
+            }
+
+            return result;
+        });
+    }
 
-        return CompletableFuture.completedFuture(result);
+    /**
+     * Handler to process {@link ReplicaSafeTimeSyncRequest}.
+     *
+     * @param request Request.
+     * @return Future.
+     */
+    private CompletionStage<Object> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+        return raftClient.run(new SafeTimeSyncCommand());
     }
 
     /**
@@ -452,7 +509,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
             }
 
-            return CompletableFuture.completedFuture(batchRows);
+            return completedFuture(batchRows);
         });
     }
 
@@ -674,7 +731,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 for (BinaryRow searchRow : request.binaryRows()) {
                     rowFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                         if (rowId == null) {
-                            return CompletableFuture.completedFuture(null);
+                            return completedFuture(null);
                         }
 
                         return takeLocksForGet(rowId, txId)
@@ -690,7 +747,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 result.add(rowFuts[idx].join());
                             }
 
-                            return CompletableFuture.completedFuture(result);
+                            return completedFuture(result);
                         });
             }
             case RW_DELETE_ALL: {
@@ -701,7 +758,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 for (BinaryRow searchRow : request.binaryRows()) {
                     rowIdLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                         if (rowId == null) {
-                            return CompletableFuture.completedFuture(null);
+                            return completedFuture(null);
                         }
 
                         return takeLocksForDelete(searchRow, rowId, txId);
@@ -725,7 +782,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     if (rowIdsToDelete.isEmpty()) {
-                        return CompletableFuture.completedFuture(result);
+                        return completedFuture(result);
                     }
 
                     return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId))
@@ -740,7 +797,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                 for (BinaryRow searchRow : request.binaryRows()) {
                     deleteExactLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                         if (rowId == null) {
-                            return CompletableFuture.completedFuture(null);
+                            return completedFuture(null);
                         }
 
                         return takeLocksForDeleteExact(searchRow, rowId, row, txId);
@@ -763,7 +820,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         }
                     }
 
-                    CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? CompletableFuture.completedFuture(null)
+                    CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null)
                             : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
 
                     return raftFut.thenApply(ignored -> result);
@@ -776,7 +833,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
                 for (BinaryRow searchRow : request.binaryRows()) {
                     pkReadLockFuts[i++] = resolveRowByPk(searchRow, txId,
-                            (rowId, row) -> CompletableFuture.completedFuture(rowId));
+                            (rowId, row) -> completedFuture(rowId));
                 }
 
                 return allOf(pkReadLockFuts).thenCompose(ignore -> {
@@ -800,7 +857,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     if (rowsToInsert.isEmpty()) {
-                        return CompletableFuture.completedFuture(result);
+                        return completedFuture(result);
                     }
 
                     CompletableFuture<RowId>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
@@ -846,7 +903,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     if (rowsToUpdate.isEmpty()) {
-                        return CompletableFuture.completedFuture(null);
+                        return completedFuture(null);
                     }
 
                     return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId))
@@ -902,7 +959,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_GET: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(null);
+                        return completedFuture(null);
                     }
 
                     return takeLocksForGet(rowId, txId)
@@ -912,7 +969,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_DELETE: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(false);
+                        return completedFuture(false);
                     }
 
                     return takeLocksForDelete(searchRow, rowId, txId)
@@ -923,7 +980,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_GET_AND_DELETE: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(null);
+                        return completedFuture(null);
                     }
 
                     return takeLocksForDelete(searchRow, rowId, txId)
@@ -934,13 +991,13 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_DELETE_EXACT: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(false);
+                        return completedFuture(false);
                     }
 
                     return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                             .thenCompose(validatedRowId -> {
                                 if (validatedRowId == null) {
-                                    return CompletableFuture.completedFuture(false);
+                                    return completedFuture(false);
                                 }
 
                                 return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, txId))
@@ -951,7 +1008,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_INSERT: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId != null) {
-                        return CompletableFuture.completedFuture(false);
+                        return completedFuture(false);
                     }
 
                     RowId rowId0 = new RowId(partId);
@@ -997,7 +1054,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_GET_AND_REPLACE: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(null);
+                        return completedFuture(null);
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
@@ -1009,7 +1066,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             case RW_REPLACE_IF_EXIST: {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     if (rowId == null) {
-                        return CompletableFuture.completedFuture(false);
+                        return completedFuture(false);
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
@@ -1055,7 +1112,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         Collection<IndexLocker> indexes = indexesLockers.get().values();
 
         if (nullOrEmpty(indexes)) {
-            return CompletableFuture.completedFuture(null);
+            return completedFuture(null);
         }
 
         CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
@@ -1072,7 +1129,7 @@ public class PartitionReplicaListener implements ReplicaListener {
         Collection<IndexLocker> indexes = indexesLockers.get().values();
 
         if (nullOrEmpty(indexes)) {
-            return CompletableFuture.completedFuture(null);
+            return completedFuture(null);
         }
 
         CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
@@ -1101,7 +1158,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 .thenApply(exclusiveRowLock -> rowId);
                     }
 
-                    return CompletableFuture.completedFuture(null);
+                    return completedFuture(null);
                 });
     }
 
@@ -1148,13 +1205,13 @@ public class PartitionReplicaListener implements ReplicaListener {
         if (request.requestType() == RequestType.RW_REPLACE) {
             return resolveRowByPk(newRow, txId, (rowId, row) -> {
                 if (rowId == null) {
-                    return CompletableFuture.completedFuture(false);
+                    return completedFuture(false);
                 }
 
                 return takeLocksForReplace(expectedRow, row, newRow, rowId, txId)
                         .thenCompose(validatedRowId -> {
                             if (validatedRowId == null) {
-                                return CompletableFuture.completedFuture(false);
+                                return completedFuture(false);
                             }
 
                             return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, newRow, txId))
@@ -1184,7 +1241,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 .thenApply(rowLock -> rowId);
                     }
 
-                    return CompletableFuture.completedFuture(null);
+                    return completedFuture(null);
                 });
     }
 
@@ -1192,9 +1249,9 @@ public class PartitionReplicaListener implements ReplicaListener {
      * Ensure that the primary replica was not changed.
      *
      * @param request Replica request.
-     * @return Future.
+     * @return Future. The result is not null only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary.
      */
-    private CompletableFuture<Void> ensureReplicaIsPrimary(ReplicaRequest request) {
+    private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest request) {
         Long expectedTerm;
 
         if (request instanceof ReadWriteReplicaRequest) {
@@ -1219,14 +1276,16 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 Long currentTerm = replicaAndTerm.get2();
 
                                 if (expectedTerm.equals(currentTerm)) {
-                                    return CompletableFuture.completedFuture(null);
+                                    return completedFuture(null);
                                 } else {
                                     return CompletableFuture.failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
                                 }
                             }
                     );
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> isLocalPeerChecker.apply(replicaAndTerm.get1()));
         } else {
-            return CompletableFuture.completedFuture(null);
+            return completedFuture(null);
         }
     }
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 92ea9c80bf..0c6a57dce7 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.when;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -77,7 +77,7 @@ public class TxLocalTest extends TxAbstractTest {
             }
         ).when(replicaSvc).invoke(any(), any());
 
-        txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClock());
+        txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClockImpl());
 
         igniteTransactions = new IgniteTransactionsImpl(txManager);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index da6c0159ab..a0f11c1468 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.configuration.notifications.ConfigurationStora
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.configuration.testframework.InjectRevisionListenerHolder;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -761,7 +762,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 msm,
                 sm = new SchemaManager(revisionUpdater, tblsCfg),
                 budgetView -> new LocalLogStorageFactory(),
-                null
+                new HybridClockImpl()
         );
 
         sm.start();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index f5435165e7..8d818f7ac6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -144,7 +145,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
 
     @Test
     public void testTxCleanupCommand() throws Exception {
-        HybridClock clock = new HybridClock();
+        HybridClock clock = new HybridClockImpl();
 
         TxCleanupCommand cmd = new TxCleanupCommand(UUID.randomUUID(), true, clock.now());
 
@@ -157,7 +158,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
 
     @Test
     public void testFinishTxCommand() throws Exception {
-        HybridClock clock = new HybridClock();
+        HybridClock clock = new HybridClockImpl();
         ArrayList<ReplicationGroupId> grps = new ArrayList<>(10);
 
         for (int i = 0; i < 10; i++) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index db1a922efe..71ef933498 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -93,7 +94,7 @@ public class PartitionCommandListenerTest {
     );
 
     /** Hybrid clock. */
-    private static final HybridClock CLOCK = new HybridClock();
+    private static final HybridClock CLOCK = new HybridClockImpl();
 
     /** Table command listener. */
     private PartitionListener commandListener;
@@ -130,7 +131,7 @@ public class PartitionCommandListenerTest {
         commandListener = new PartitionListener(
                 mvPartitionStorage,
                 new TestConcurrentHashMapTxStateStorage(),
-                new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock()),
+                new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
                 () -> Map.of(pkStorage.id(), pkStorage)
         );
     }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index b3f59b0f1c..facbd001da 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.TopologyService;
@@ -91,7 +93,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
     private static final UUID HASH_INDEX_ID = new UUID(0L, 2L);
     private static final UUID SORTED_INDEX_ID = new UUID(0L, 3L);
     private static final UUID TRANSACTION_ID = Timestamp.nextVersion().toUuid();
-    private static final HybridClock CLOCK = new HybridClock();
+    private static final HybridClock CLOCK = new HybridClockImpl();
     private static final LockManager LOCK_MANAGER = new HeapLockManager();
     private static final TablePartitionId PARTITION_ID = new TablePartitionId(TABLE_ID, PART_ID);
     private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
@@ -170,9 +172,11 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
                 ),
                 pkStorage,
                 CLOCK,
+                new PendingComparableValuesTracker<>(CLOCK.now()),
                 new TestConcurrentHashMapTxStateStorage(),
                 mock(TopologyService.class),
-                mock(PlacementDriver.class)
+                mock(PlacementDriver.class),
+                peer -> true
         );
 
         kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index f8623eb19c..f2de21750d 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -101,7 +103,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
 
     /** Hybrid clock. */
-    private static final HybridClock clock = new HybridClock();
+    private static final HybridClock clock = new HybridClockImpl();
 
     /** The storage stores transaction states. */
     private static final TestConcurrentHashMapTxStateStorage txStateStorage = new TestConcurrentHashMapTxStateStorage();
@@ -186,6 +188,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             return CompletableFuture.completedFuture(txMeta);
         });
 
+        PendingComparableValuesTracker safeTimeClock = mock(PendingComparableValuesTracker.class);
+        when(safeTimeClock.waitFor(any())).thenReturn(CompletableFuture.completedFuture(null));
+
         UUID indexId = UUID.randomUUID();
 
         BinaryTupleSchema pkSchema = BinaryTupleSchema.create(new Element[]{
@@ -214,9 +219,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 () -> Map.of(pkLocker.id(), pkLocker),
                 pkStorage,
                 clock,
+                safeTimeClock,
                 txStateStorage,
                 topologySrv,
-                placementDriver
+                placementDriver,
+                peer -> true
         );
 
         marshallerFactory = new ReflectionMarshallerFactory();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 2ec7c99459..3f202bb5f0 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
@@ -139,11 +141,11 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 1,
                 NetworkAddress::toString,
                 addr -> Mockito.mock(ClusterNode.class),
-                txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClock()) : txManager,
+                txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()) : txManager,
                 mock(MvTableStorage.class),
                 mock(TxStateTableStorage.class),
                 replicaSvc,
-                new HybridClock()
+                new HybridClockImpl()
         );
         RaftGroupService svc = partitionMap.get(0);
 
@@ -227,6 +229,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
 
         IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2tuple);
 
+        HybridClock clock = new HybridClockImpl();
+
         replicaListener = new PartitionReplicaListener(
                 mvPartStorage,
                 partitionMap.get(0),
@@ -236,10 +240,12 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 tableId,
                 () -> Map.of(pkLocker.id(), pkLocker),
                 pkStorage,
-                new HybridClock(),
+                clock,
+                new PendingComparableValuesTracker<>(clock.now()),
+                null,
                 null,
                 null,
-                null
+                peer -> true
         );
 
         partitionListener = new PartitionListener(
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 5a6ffa759f..54738fb3e8 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 
 import java.util.Objects;
 import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -67,7 +67,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
 
-        txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock());
+        txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
     }
 
     @Test