You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/08/29 11:59:40 UTC

[ignite-3] branch main updated: IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e26fc8016d IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)
e26fc8016d is described below

commit e26fc8016ddd4ebb99596909157f2f0cdb925ebf
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Aug 29 15:59:32 2023 +0400

    IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)
    
    * No mechanism exists to forbid reordering of idle safe time propagation commands wrt other MetaStorage write commands, it's added in this commit
    * Idle safe time commands are triggered too early, this is fixed
---
 .../impl/ItMetaStorageManagerImplTest.java         | 52 +++++++++++++++++++++-
 .../ItMetaStorageMultipleNodesAbstractTest.java    |  2 +-
 ...MetaStorageSafeTimePropagationAbstractTest.java | 13 ++++--
 .../metastorage/command/SyncTimeCommand.java       | 14 +-----
 .../metastorage/impl/MetaStorageManagerImpl.java   | 35 ++++++++++++---
 .../metastorage/impl/MetaStorageServiceImpl.java   |  2 +-
 .../metastorage/server/KeyValueStorage.java        |  7 +++
 .../server/OnRevisionAppliedCallback.java          | 15 +++++--
 .../metastorage/server/WatchProcessor.java         | 21 ++++++++-
 .../server/persistence/RocksDbKeyValueStorage.java | 13 ++++++
 .../server/raft/MetaStorageWriteHandler.java       | 30 ++++++++-----
 .../server/BasicOperationsKeyValueStorageTest.java | 28 ++++++++++--
 .../metastorage/server/WatchProcessorTest.java     | 10 ++---
 .../server/SimpleInMemoryKeyValueStorage.java      | 11 +++++
 14 files changed, 202 insertions(+), 51 deletions(-)

diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index d2dbc13721..091ed4c1fe 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -40,6 +41,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -95,7 +100,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
     void setUp(
             TestInfo testInfo,
             @InjectConfiguration RaftConfiguration raftConfiguration,
-            @InjectConfiguration MetaStorageConfiguration metaStorageConfiguration
+            @InjectConfiguration("mock.idleSyncTimeInterval = 100") MetaStorageConfiguration metaStorageConfiguration
     ) {
         var addr = new NetworkAddress("localhost", 10_000);
 
@@ -262,4 +267,49 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
 
         assertThat(revisionCapture.getAllValues(), is(List.of(revision + 1)));
     }
+
+    /**
+     * Tests that idle safe time propagation does not advance safe time while watches of a normal command are being executed.
+     */
+    @Test
+    void testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo testInfo) throws Exception {
+        var key = new ByteArray("foo");
+        byte[] value = "bar".getBytes(UTF_8);
+
+        AtomicBoolean watchCompleted = new AtomicBoolean(false);
+        CompletableFuture<HybridTimestamp> watchEventTsFuture = new CompletableFuture<>();
+
+        metaStorageManager.registerExactWatch(key, new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                watchEventTsFuture.complete(event.timestamp());
+
+                // The future will set the flag and complete after 300ms to allow idle safe time mechanism (which ticks each 100ms)
+                // to advance SafeTime (if there is still a bug for which this test is written).
+                return waitFor(300, TimeUnit.MILLISECONDS)
+                        .whenComplete((res, ex) -> watchCompleted.set(true));
+            }
+
+            @Override
+            public void onError(Throwable e) {
+            }
+        });
+
+        metaStorageManager.put(key, value);
+
+        ClusterTime clusterTime = metaStorageManager.clusterTime();
+
+        assertThat(watchEventTsFuture, willSucceedIn(5, TimeUnit.SECONDS));
+
+        HybridTimestamp watchEventTs = watchEventTsFuture.join();
+        assertThat(clusterTime.waitFor(watchEventTs), willCompleteSuccessfully());
+
+        assertThat("Safe time is advanced too early", watchCompleted.get(), is(true));
+    }
+
+    private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit) {
+        return new CompletableFuture<Void>()
+                .orTimeout(timeout, unit)
+                .exceptionally(ex -> null);
+    }
 }
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 1e039b2bf8..082c3b27f1 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -234,7 +234,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
 
     private final List<Node> nodes = new ArrayList<>();
 
-    private Node startNode(TestInfo testInfo) throws NodeStoppingException {
+    private Node startNode(TestInfo testInfo) {
         var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000)));
 
         ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder);
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 47b8ad923a..1baafe9b59 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
+import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.junit.jupiter.api.AfterEach;
@@ -45,10 +46,16 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
 
     @BeforeEach
     public void startWatches() {
-        storage.startWatches(1, (e, t) -> {
-            time.updateSafeTime(t);
+        storage.startWatches(1, new OnRevisionAppliedCallback() {
+            @Override
+            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                time.updateSafeTime(newSafeTime);
+            }
 
-            return CompletableFuture.completedFuture(null);
+            @Override
+            public CompletableFuture<Void> onRevisionApplied(WatchEvent e) {
+                return CompletableFuture.completedFuture(null);
+            }
         });
     }
 
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
index 4032f0f8f2..fadd2deee6 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
@@ -17,25 +17,13 @@
 
 package org.apache.ignite.internal.metastorage.command;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Command that initiates idle safe time synchronization.
  */
 @Transferable(MetastorageCommandsMessageGroup.SYNC_TIME)
-public interface SyncTimeCommand extends WriteCommand {
-    /** New safe time. */
-    long safeTimeLong();
-
+public interface SyncTimeCommand extends MetaStorageWriteCommand {
     /** Term of the initiator. */
     long initiatorTerm();
-
-    /** New safe time. */
-    default HybridTimestamp safeTime() {
-        return hybridTimestamp(safeTimeLong());
-    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8d810e6044..b3c41997cf 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -449,7 +450,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
             return recoveryFinishedFuture
                     .thenAccept(revision -> inBusyLock(busyLock, () -> {
                         // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault.
-                        storage.startWatches(revision + 1, this::onRevisionApplied);
+                        storage.startWatches(revision + 1, new OnRevisionAppliedCallback() {
+                            @Override
+                            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                                MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
+                            }
+
+                            @Override
+                            public CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) {
+                                return MetaStorageManagerImpl.this.onRevisionApplied(watchEvent);
+                            }
+                        });
                     }))
                     .whenComplete((v, e) -> {
                         if (e == null) {
@@ -817,20 +828,32 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
         }
     }
 
+    private void onSafeTimeAdvanced(HybridTimestamp time) {
+        assert time != null;
+
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping advancing Safe Time because the node is stopping");
+
+            return;
+        }
+
+        try {
+            clusterTime.updateSafeTime(time);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Saves processed Meta Storage revision and corresponding entries to the Vault.
      */
-    private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time) {
-        assert time != null;
-
+    private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) {
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping applying MetaStorage revision because the node is stopping");
 
             return completedFuture(null);
         }
 
-        clusterTime.updateSafeTime(time);
-
         try {
             CompletableFuture<Void> saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
 
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 4f32decdff..5637cb274a 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -279,7 +279,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
      */
     public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) {
         SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand()
-                .safeTimeLong(safeTime.longValue())
+                .initiatorTimeLong(safeTime.longValue())
                 .initiatorTerm(term)
                 .build();
 
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 5b29bcab25..de625f477e 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -312,4 +312,11 @@ public interface KeyValueStorage extends ManuallyCloseable {
 
     /** Explicitly notifies revision update listeners. */
     CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision);
+
+    /**
+     * Advances MetaStorage Safe Time to a new value without creating a new revision.
+     *
+     * @param newSafeTime New Safe Time value.
+     */
+    void advanceSafeTime(HybridTimestamp newSafeTime);
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 93a2fc1c37..b514048950 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -22,16 +22,23 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 
 /**
- * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision.
+ * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision
+ * and/or when SafeTime gets advanced.
  */
-@FunctionalInterface
 public interface OnRevisionAppliedCallback {
+    /**
+     * Invoked whenever MetaStorage Safe Time gets advanced (either because a write command is applied,
+     * together with all watches that process it, or because idle safe time mechanism advanced Safe Time).
+     *
+     * @param newSafeTime New safe time value.
+     */
+    void onSafeTimeAdvanced(HybridTimestamp newSafeTime);
+
     /**
      * Notifies of completion of processing of Meta Storage watches for a particular revision.
      *
      * @param watchEvent Event with modified Meta Storage entries processed by at least one Watch.
-     * @param newSafeTime Safe time of the applied revision.
      * @return Future that represents the state of the execution of the callback.
      */
-    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp newSafeTime);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index f334dff57b..5a58278f48 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -122,7 +122,7 @@ public class WatchProcessor implements ManuallyCloseable {
     }
 
     /**
-     * Notifies registered watch about an update event.
+     * Notifies registered watches about an update event.
      */
     public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) {
         assert time != null;
@@ -244,7 +244,9 @@ public class WatchProcessor implements ManuallyCloseable {
 
             var event = new WatchEvent(acceptedEntries, revision, time);
 
-            return revisionCallback.onRevisionApplied(event, time)
+            revisionCallback.onSafeTimeAdvanced(time);
+
+            return revisionCallback.onRevisionApplied(event)
                     .whenComplete((ignored, e) -> {
                         if (e != null) {
                             LOG.error("Error occurred when notifying watches", e);
@@ -257,6 +259,21 @@ public class WatchProcessor implements ManuallyCloseable {
         }
     }
 
+    /**
+     * Advances safe time without notifying watches (as there is no new revision).
+     */
+    public void advanceSafeTime(HybridTimestamp time) {
+        assert time != null;
+
+        notificationFuture = notificationFuture
+                .thenRunAsync(() -> revisionCallback.onSafeTimeAdvanced(time), watchExecutor)
+                .whenComplete((ignored, e) -> {
+                    if (e != null) {
+                        LOG.error("Error occurred when notifying safe time advanced callback", e);
+                    }
+                });
+    }
+
     @Override
     public void close() {
         notificationFuture.cancel(true);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index abb2d85848..d691d73c14 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1664,4 +1664,17 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
         return watchProcessor.notifyUpdateRevisionListeners(newRevision);
     }
+
+    @Override
+    public void advanceSafeTime(HybridTimestamp newSafeTime) {
+        rwLock.writeLock().lock();
+
+        try {
+            if (recoveryStatus.get() == RecoveryStatus.DONE) {
+                watchProcessor.advanceSafeTime(newSafeTime);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 8f836a1963..a8f98b58ba 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -83,18 +83,18 @@ public class MetaStorageWriteHandler {
             if (command instanceof MetaStorageWriteCommand) {
                 var cmdWithTime = (MetaStorageWriteCommand) command;
 
-                HybridTimestamp safeTime = cmdWithTime.safeTime();
+                if (command instanceof SyncTimeCommand) {
+                    var syncTimeCommand = (SyncTimeCommand) command;
 
-                handleWriteWithTime(clo, cmdWithTime, safeTime);
-            } else if (command instanceof SyncTimeCommand) {
-                var syncTimeCommand = (SyncTimeCommand) command;
+                    // Ignore the command if it has been sent by a stale leader.
+                    if (clo.term() != syncTimeCommand.initiatorTerm()) {
+                        clo.result(null);
 
-                // Ignore the command if it has been sent by a stale leader.
-                if (clo.term() == syncTimeCommand.initiatorTerm()) {
-                    clusterTime.updateSafeTime(syncTimeCommand.safeTime());
+                        return;
+                    }
                 }
 
-                clo.result(null);
+                handleWriteWithTime(clo, cmdWithTime);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
@@ -118,9 +118,10 @@ public class MetaStorageWriteHandler {
      *
      * @param clo Command closure.
      * @param command Command.
-     * @param opTime Command's time.
      */
-    private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command, HybridTimestamp opTime) {
+    private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command) {
+        HybridTimestamp opTime = command.safeTime();
+
         if (command instanceof PutCommand) {
             PutCommand putCmd = (PutCommand) command;
 
@@ -177,6 +178,10 @@ public class MetaStorageWriteHandler {
             MultiInvokeCommand cmd = (MultiInvokeCommand) command;
 
             clo.result(storage.invoke(toIf(cmd.iif()), opTime));
+        } else if (command instanceof SyncTimeCommand) {
+            storage.advanceSafeTime(command.safeTime());
+
+            clo.result(null);
         }
     }
 
@@ -286,7 +291,10 @@ public class MetaStorageWriteHandler {
         }
     }
 
-    void beforeApply(Command command) {
+    // TODO: IGNITE-20290 - This is insufficient, we must do this in single thread before saving the command to the RAFT log.
+    // Synchronized to make sure no reodering happens as RaftGroupListener#beforeApply() might be invoked in different threads
+    // for different commands.
+    synchronized void beforeApply(Command command) {
         if (command instanceof MetaStorageWriteCommand) {
             // Initiator sends us a timestamp to adjust to.
             // Alter command by setting safe time based on the adjusted clock.
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 198ade0395..5486b2e81a 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1969,7 +1969,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(1, (event, ts) -> completedFuture(null));
+        storage.startWatches(1, new OnRevisionAppliedCallback() {
+            @Override
+            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                // No-op.
+            }
+
+            @Override
+            public CompletableFuture<Void> onRevisionApplied(WatchEvent event) {
+                return completedFuture(null);
+            }
+        });
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2308,7 +2318,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         OnRevisionAppliedCallback mockCallback = mock(OnRevisionAppliedCallback.class);
 
-        when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
+        when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
 
         storage.startWatches(1, mockCallback);
 
@@ -2320,7 +2330,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         verify(mockListener3, timeout(10_000)).onUpdate(any());
 
-        verify(mockCallback, never()).onRevisionApplied(any(), any());
+        verify(mockCallback, never()).onRevisionApplied(any());
     }
 
     @Test
@@ -2505,7 +2515,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(1, (event, ts) -> completedFuture(null));
+        storage.startWatches(1, new OnRevisionAppliedCallback() {
+            @Override
+            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                // No-op.
+            }
+
+            @Override
+            public CompletableFuture<Void> onRevisionApplied(WatchEvent event) {
+                return completedFuture(null);
+            }
+        });
 
         return resultFuture;
     }
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index fcbf626497..745e0f2143 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -56,7 +56,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
 
     @BeforeEach
     void setUp() {
-        when(revisionCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
+        when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
 
         watchProcessor.setRevisionCallback(revisionCallback);
     }
@@ -90,7 +90,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
 
         var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
 
         WatchEvent event = watchEventCaptor.getValue();
 
@@ -120,7 +120,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
 
         verify(listener1, timeout(1_000)).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
 
         ts = new HybridTimestamp(2, 3);
 
@@ -130,7 +130,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
 
         verify(listener2, timeout(1_000)).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
     }
 
     /**
@@ -156,7 +156,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2, timeout(1_000)).onError(any(IllegalStateException.class));
 
-        verify(revisionCallback, never()).onRevisionApplied(any(), any());
+        verify(revisionCallback, never()).onRevisionApplied(any());
     }
 
     /**
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 6760682702..36980dc87e 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -888,4 +888,15 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
         return watchProcessor.notifyUpdateRevisionListeners(newRevision);
     }
+
+    @Override
+    public void advanceSafeTime(HybridTimestamp newSafeTime) {
+        synchronized (mux) {
+            if (!areWatchesEnabled) {
+                return;
+            }
+
+            watchProcessor.advanceSafeTime(newSafeTime);
+        }
+    }
 }