You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2023/05/23 11:34:28 UTC

[ignite-3] branch ignite-19532 created (now 0afca2f44f)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-19532
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 0afca2f44f IGNITE-19532 Happens-before for safe time propagation

This branch includes the following new commits:

     new 0afca2f44f IGNITE-19532 Happens-before for safe time propagation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/01: IGNITE-19532 Happens-before for safe time propagation

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-19532
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 0afca2f44ffce8fa3b1392974c1d7fde6feca51f
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue May 23 15:34:18 2023 +0400

    IGNITE-19532 Happens-before for safe time propagation
---
 .../ignite/internal/hlc/HybridTimestamp.java       |   7 +-
 .../impl/ItMetaStorageMultipleNodesTest.java       |  34 ++++---
 .../metastorage/impl/MetaStorageManagerImpl.java   |  11 ++-
 .../server/OnRevisionAppliedCallback.java          |   3 +-
 .../metastorage/server/WatchProcessor.java         |  16 +++-
 .../server/persistence/RocksDbKeyValueStorage.java | 103 ++++++++++++++++++---
 .../persistence/StorageColumnFamilyType.java       |   4 +-
 .../server/raft/MetaStorageWriteHandler.java       |   8 +-
 .../server/BasicOperationsKeyValueStorageTest.java |   8 +-
 .../server/RocksDbKeyValueStorageTest.java         |   2 +-
 .../metastorage/server/WatchProcessorTest.java     |  23 ++---
 .../server/SimpleInMemoryKeyValueStorage.java      |  17 ++--
 12 files changed, 171 insertions(+), 65 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 5cdcebdcd5..cfa133ed50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -82,7 +82,12 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
         }
     }
 
-    private HybridTimestamp(long time) {
+    /**
+     * The constructor.
+     *
+     * @param time Long time value.
+     */
+    public HybridTimestamp(long time) {
         this.time = time;
 
         // Negative time breaks comparison, we don't allow overflow of the physical time.
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index a817026ba3..68043ee720 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -72,9 +73,7 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -193,16 +192,6 @@ public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
                     .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners()))
                     .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet()));
         }
-
-        void startDroppingMessagesTo(Node recipient, Class<? extends NetworkMessage> msgType) {
-            ((DefaultMessagingService) clusterService.messagingService())
-                    .dropMessages((recipientConsistentId, message) ->
-                            recipient.name().equals(recipientConsistentId) && msgType.isInstance(message));
-        }
-
-        void stopDroppingMessages() {
-            ((DefaultMessagingService) clusterService.messagingService()).stopDroppingMessages();
-        }
     }
 
     private final List<Node> nodes = new ArrayList<>();
@@ -364,18 +353,37 @@ public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        CompletableFuture<Void> f = new CompletableFuture<>();
+        CountDownLatch l = new CountDownLatch(1);
+
+        secondNode.metaStorageManager.registerExactWatch(ByteArray.fromString("test-key"), new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                l.countDown();
+                return f;
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                // No-op.
+            }
+        });
+
         // Try putting data from both nodes, because any of them can be a leader.
         assertThat(
                 firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new byte[]{0, 1, 2, 3}),
                 willCompleteSuccessfully()
         );
 
+        assertTrue(l.await(1, TimeUnit.SECONDS));
+        f.complete(null);
+
         assertTrue(waitForCondition(() -> {
             HybridTimestamp sf1 = firstNodeTime.currentSafeTime();
             HybridTimestamp sf2 = secondNodeTime.currentSafeTime();
 
             return sf1.equals(sf2);
-        }, TimeUnit.SECONDS.toMillis(1)));
+        }, 100));
 
         assertThat(
                 secondNode.metaStorageManager.put(ByteArray.fromString("test-key-2"), new byte[]{0, 1, 2, 3}),
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7bc432a27c..5bf665416d 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -150,7 +151,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
             // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
             if (metaStorageNodes.contains(thisNodeName)) {
@@ -283,7 +284,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
 
         try {
             // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault.
-            storage.startWatches(this::saveUpdatedEntriesToVault);
+            storage.startWatches(this::onRevisionApplied);
         } finally {
             busyLock.leaveBusy();
         }
@@ -599,13 +600,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
     /**
      * Saves processed Meta Storage revision and corresponding entries to the Vault.
      */
-    private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent watchEvent) {
+    private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time) {
+        assert time != null;
+
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping applying MetaStorage revision because the node is stopping");
 
             return completedFuture(null);
         }
 
+        clusterTime.updateSafeTime(time);
+
         try {
             CompletableFuture<Void> saveToVaultFuture;
 
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 0ce5b39072..d3e20e328b 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.server;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 
 /**
@@ -31,5 +32,5 @@ public interface OnRevisionAppliedCallback {
      * @param watchEvent Event with modified Meta Storage entries processed at least one Watch.
      * @return Future that represents the state of the execution of the callback.
      */
-    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time);
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index aac4330e29..234a14dd08 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -118,7 +119,10 @@ public class WatchProcessor implements ManuallyCloseable {
     /**
      * Notifies registered watch about an update event.
      */
-    public void notifyWatches(List<Entry> updatedEntries) {
+    @SuppressWarnings("unchecked")
+    public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) {
+        assert time != null;
+
         notificationFuture = notificationFuture
                 .thenComposeAsync(v -> {
                     // Revision must be the same for all entries.
@@ -130,7 +134,7 @@ public class WatchProcessor implements ManuallyCloseable {
                             .toArray(CompletableFuture[]::new);
 
                     return allOf(notificationFutures)
-                            .thenComposeAsync(ignored -> invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+                            .thenComposeAsync(ignored -> invokeOnRevisionCallback(notificationFutures, newRevision, time), watchExecutor);
                 }, watchExecutor);
     }
 
@@ -179,7 +183,11 @@ public class WatchProcessor implements ManuallyCloseable {
                 });
     }
 
-    private CompletableFuture<Void> invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[] notificationFutures, long revision) {
+    private CompletableFuture<Void> invokeOnRevisionCallback(
+            CompletableFuture<List<EntryEvent>>[] notificationFutures,
+            long revision,
+            HybridTimestamp time
+    ) {
         try {
             // Only notify about entries that have been accepted by at least one Watch.
             var acceptedEntries = new HashSet<EntryEvent>();
@@ -193,7 +201,7 @@ public class WatchProcessor implements ManuallyCloseable {
 
             var event = new WatchEvent(acceptedEntries, revision);
 
-            return revisionCallback.onRevisionApplied(event)
+            return revisionCallback.onRevisionApplied(event, time)
                     .whenComplete((ignored, e) -> {
                         if (e != null) {
                             LOG.error("Error occurred when notifying watches", e);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 0d439a0b24..cc667908e3 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.metastorage.server.persistence.RocksSto
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
 import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
 import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
@@ -47,6 +48,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -156,6 +158,9 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Timestamp to revision mapping column family. */
     private volatile ColumnFamily tsToRevision;
 
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;
+
     /** Snapshot manager. */
     private volatile RocksSnapshotManager snapshotManager;
 
@@ -197,14 +202,14 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
      * <p>Multi-threaded access is guarded by {@link #rwLock}.
      */
     @Nullable
-    private List<List<Entry>> eventCache;
+    private List<UpdatedEntries> eventCache;
 
     /**
      * Current list of updated entries.
      *
      * <p>Since this list gets read and updated only on writes (under a write lock), no extra synchronisation is needed.
      */
-    private final List<Entry> updatedEntries = new ArrayList<>();
+    private final UpdatedEntries updatedEntries = new UpdatedEntries();
 
     /**
      * Constructor.
@@ -242,10 +247,14 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
         Options tsToRevOptions = new Options().setCreateIfMissing(true);
         ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(tsToRevOptions);
 
+        Options revToTsOptions = new Options().setCreateIfMissing(true);
+        ColumnFamilyOptions revToTsFamilyOptions = new ColumnFamilyOptions(revToTsOptions);
+
         return List.of(
                 new ColumnFamilyDescriptor(DATA.nameAsBytes(), dataFamilyOptions),
                 new ColumnFamilyDescriptor(INDEX.nameAsBytes(), indexFamilyOptions),
-                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions)
+                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions),
+                new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(), revToTsFamilyOptions)
         );
     }
 
@@ -254,7 +263,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
 
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
-        assert descriptors.size() == 3;
+        assert descriptors.size() == 4;
 
         var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
 
@@ -270,8 +279,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
 
         tsToRevision = ColumnFamily.wrap(db, handles.get(2));
 
+        revisionToTs = ColumnFamily.wrap(db, handles.get(3));
+
         snapshotManager = new RocksSnapshotManager(db,
-                List.of(fullRange(data), fullRange(index), fullRange(tsToRevision)),
+                List.of(fullRange(data), fullRange(index), fullRange(tsToRevision), fullRange(revisionToTs)),
                 snapshotExecutor
         );
     }
@@ -413,7 +424,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
             data.put(batch, REVISION_KEY, revisionBytes);
 
             if (ts != null) {
-                tsToRevision.put(batch, hybridTsToArray(ts), revisionBytes);
+                byte[] tsBytes = hybridTsToArray(ts);
+
+                tsToRevision.put(batch, tsBytes, revisionBytes);
+                revisionToTs.put(batch, revisionBytes, tsBytes);
             }
 
             db.write(opts, batch);
@@ -422,6 +436,8 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
             updCntr = newCntr;
         }
 
+        updatedEntries.ts = ts;
+
         queueWatchEvent();
     }
 
@@ -1329,7 +1345,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
                     eventCache = new ArrayList<>();
                 }
 
-                eventCache.add(List.copyOf(updatedEntries));
+                eventCache.add(updatedEntries.copy());
 
                 updatedEntries.clear();
 
@@ -1343,7 +1359,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     }
 
     private void notifyWatches() {
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        UpdatedEntries copy = updatedEntries.copy();
+
+        assert copy.ts != null;
+
+        watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
 
         updatedEntries.clear();
     }
@@ -1359,6 +1379,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
         }
 
         var updatedEntries = new ArrayList<Entry>();
+        HybridTimestamp ts = null;
 
         try (
                 var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1379,7 +1400,9 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
                     if (!updatedEntries.isEmpty()) {
                         var updatedEntriesCopy = List.copyOf(updatedEntries);
 
-                        watchProcessor.notifyWatches(updatedEntriesCopy);
+                        assert ts != null;
+
+                        watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
                     }
@@ -1387,6 +1410,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
                     lastSeenRevision = revision;
                 }
 
+                if (ts == null) {
+                    ts = timestampByRevision(revision);
+                }
+
                 updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, bytesToValue(rocksValue)));
             }
 
@@ -1394,13 +1421,27 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
 
             // Notify about the events left after finishing the cycle above.
             if (!updatedEntries.isEmpty()) {
-                watchProcessor.notifyWatches(updatedEntries);
+                assert ts != null;
+
+                watchProcessor.notifyWatches(updatedEntries, ts);
             }
         }
 
         finishReplay();
     }
 
+    private HybridTimestamp timestampByRevision(long revision) {
+        try {
+            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+            assert tsBytes != null;
+
+            return new HybridTimestamp(bytesToLong(tsBytes));
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private void finishReplay() {
         // Take the lock to drain the event cache and prevent new events from being cached. Since event notification is asynchronous,
         // this lock shouldn't be held for long.
@@ -1408,7 +1449,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
 
         try {
             if (eventCache != null) {
-                eventCache.forEach(watchProcessor::notifyWatches);
+                eventCache.forEach(entries -> {
+                    assert entries.ts != null;
+
+                    watchProcessor.notifyWatches(entries.updatedEntries, entries.ts);
+                });
 
                 eventCache = null;
             }
@@ -1423,4 +1468,40 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     public Path getDbPath() {
         return dbPath;
     }
+
+    private static class UpdatedEntries {
+        private final List<Entry> updatedEntries;
+
+        @Nullable
+        private HybridTimestamp ts;
+
+        public UpdatedEntries() {
+            this.updatedEntries = new ArrayList<>();
+        }
+
+        private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) {
+            this.updatedEntries = updatedEntries;
+            this.ts = Objects.requireNonNull(ts);
+        }
+
+        public boolean isEmpty() {
+            return updatedEntries.isEmpty();
+        }
+
+        public boolean add(Entry entry) {
+            return updatedEntries.add(entry);
+        }
+
+        public void clear() {
+            updatedEntries.clear();
+
+            ts = null;
+        }
+
+        public UpdatedEntries copy() {
+            assert ts != null;
+
+            return new UpdatedEntries(new ArrayList<>(updatedEntries), ts);
+        }
+    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
index 958b73fb88..d1d92b99c9 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -31,7 +31,9 @@ enum StorageColumnFamilyType {
     INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
 
     /** Column family for the timestamp to revision mapping. */
-    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8));
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8));
 
     /** Byte representation of the column family's name. */
     private final byte[] nameAsBytes;
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index ca80ffc46a..6354ece930 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -88,13 +88,9 @@ class MetaStorageWriteHandler {
                 safeTime = cmdWithTime.safeTime();
 
                 handleWriteWithTime(clo, cmdWithTime, safeTime);
-
-                // Every MetaStorageWriteCommand holds safe time that we should set as the cluster time.
-                clusterTime.updateSafeTime(safeTime);
             } else if (command instanceof SyncTimeCommand) {
-                clusterTime.updateSafeTime(((SyncTimeCommand) command).safeTime());
-
-                clo.result(null);
+                // TODO: IGNITE-19199 WatchProcessor must be notified of the new safe time.
+                throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199");
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 27033f9685..fd2318d8ff 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2226,7 +2226,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         OnRevisionAppliedCallback mockCallback = mock(OnRevisionAppliedCallback.class);
 
-        when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
 
         storage.startWatches(mockCallback);
 
@@ -2238,7 +2238,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
 
         verify(mockListener3, timeout(10_000)).onUpdate(any());
 
-        verify(mockCallback, never()).onRevisionApplied(any());
+        verify(mockCallback, never()).onRevisionApplied(any(), any());
     }
 
     @Test
@@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         return resultFuture;
     }
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 31baeb0f31..ff02241d07 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends BasicOperationsKeyValueStorageTe
             }
         });
 
-        storage.startWatches(event -> CompletableFuture.completedFuture(null));
+        storage.startWatches((event, ts) -> CompletableFuture.completedFuture(null));
 
         storage.restoreSnapshot(snapshotPath);
 
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 1e64ecd803..346da4712a 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -55,7 +56,7 @@ public class WatchProcessorTest {
 
     @BeforeEach
     void setUp() {
-        when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(revisionCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
 
         watchProcessor.setRevisionCallback(revisionCallback);
     }
@@ -79,7 +80,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE);
 
         var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
         var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
@@ -92,7 +93,7 @@ public class WatchProcessorTest {
 
         var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
 
         WatchEvent event = watchEventCaptor.getValue();
 
@@ -114,23 +115,23 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1));
+        watchProcessor.notifyWatches(List.of(entry1), HybridTimestamp.MAX_VALUE);
 
         var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
 
         verify(listener1, timeout(1_000)).onUpdate(event);
         verify(listener2, timeout(1_000)).onRevisionUpdated(1);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, any());
 
-        watchProcessor.notifyWatches(List.of(entry2));
+        watchProcessor.notifyWatches(List.of(entry2), HybridTimestamp.MAX_VALUE);
 
         event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
 
         verify(listener1, timeout(1_000)).onRevisionUpdated(2);
         verify(listener2, timeout(1_000)).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, any());
     }
 
     /**
@@ -150,13 +151,13 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2, timeout(1_000)).onError(any(IllegalStateException.class));
 
-        verify(revisionCallback, never()).onRevisionApplied(any());
+        verify(revisionCallback, never()).onRevisionApplied(any(), any());
     }
 
     /**
@@ -182,7 +183,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2)));
@@ -190,7 +191,7 @@ public class WatchProcessorTest {
         var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0);
         var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry3, entry4));
+        watchProcessor.notifyWatches(List.of(entry3, entry4), HybridTimestamp.MAX_VALUE);
 
         verify(listener1, never()).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry3), entry3)));
         verify(listener2, never()).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry4), entry4)));
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 2096e9ab4e..6f2c404191 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -64,6 +64,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
+    /** Revision to timestamp mapping. */
+    private final NavigableMap<Long, Long> revToTsMap = new TreeMap<>();
+
     /** Revisions index. Value contains all entries which were modified under particular revision. */
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
 
@@ -120,6 +123,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         rev = newRevision;
 
         tsToRevMap.put(ts.longValue(), rev);
+        revToTsMap.put(rev, ts.longValue());
 
         notifyWatches();
     }
@@ -468,7 +472,10 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
             return;
         }
 
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        Long tsLong = revToTsMap.get(updatedEntries.get(0).revision());
+        assert tsLong != null;
+
+        watchProcessor.notifyWatches(List.copyOf(updatedEntries), new HybridTimestamp(tsLong));
 
         updatedEntries.clear();
     }
@@ -748,12 +755,4 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private static long lastRevision(List<Long> revs) {
         return revs.get(revs.size() - 1);
     }
-
-    private static List<Long> listOf(long val) {
-        List<Long> res = new ArrayList<>();
-
-        res.add(val);
-
-        return res;
-    }
 }