You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "SammyVimes (via GitHub)" <gi...@apache.org> on 2023/05/03 19:22:21 UTC

[GitHub] [ignite-3] SammyVimes opened a new pull request, #2019: IGNITE-14734 Implement compaction functionality management for meta storage

SammyVimes opened a new pull request, #2019:
URL: https://github.com/apache/ignite-3/pull/2019

   https://issues.apache.org/jira/browse/IGNITE-14734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1187237380


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1020,104 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing revision that are no longer needed.
+     * Last entry with a revision lesser or equal to the {@code minRevisionToKeep} and all consecutive entries will be preserved.
+     * If the first entry to keep is a tombstone, it will be removed.
+     * Example:
+     * <pre>
+     * Example 1:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 6
+     *
+     *     entry1: exists
+     *     entry2: exists
+     *
+     * Example 2:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 7
+     *
+     *     entry1: doesn't exist
+     *     entry2: exists
+     * </pre>
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param minRevisionToKeep Minimum revision that should be kept.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
-        long lastRev = lastRevision(revs);
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long minRevisionToKeep) throws RocksDBException {
+        if (revs.length < 2) {
+            // If we have less than two revisions, there is no point in compaction.
+            return;
+        }
+
+        // Index of the first revision we will be keeping in the array of revisions.
+        int idxToKeepFrom = 0;
+
+        // Whether there is an entry with the minRevisionToKeep.
+        boolean hasMinRevision = false;
+
+        // Traverse revisions, looking for the first revision that needs to be kept.
+        for (long rev : revs) {
+            if (rev >= minRevisionToKeep) {
+                if (rev == minRevisionToKeep) {
+                    hasMinRevision = true;
+                }
+                break;
+            }
+
+            idxToKeepFrom++;
+        }

Review Comment:
   Why? It's a couple of lines, no need to share it with test implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1187240638


##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactEmptyStorage() {
+        storage.compact(clock.now());

Review Comment:
   That no exception is thrown if we are compacting empty storage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185765578


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   I'm hinting at the fact that the revision of what is not clear, and I ask you to specify a little more in the documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185765956


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;

Review Comment:
   Let's delete it, because it's not needed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185440388


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -156,8 +183,7 @@ static byte[] valueToBytes(byte[] value, long updateCounter) {
      * @param bytes Byte array of longs.
      * @return Array of longs.
      */
-    @NotNull
-    static long[] getAsLongs(byte[] bytes) {
+    static long @NotNull [] getAsLongs(byte[] bytes) {

Review Comment:
   NotNull is not prohibited, is it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185845800


##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {

Review Comment:
   I prefer it that way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185442748


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */

Review Comment:
   Metastorage, since it is a storage of the metastorage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185443726


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -404,14 +440,22 @@ private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr) thro
         queueWatchEvent();
     }
 
+    private byte[] hybridTsToArray(HybridTimestamp ts) {
+        byte[] array = new byte[Long.BYTES];
+
+        putLongToBytes(ts.longValue(), array, 0);
+
+        return array;
+    }

Review Comment:
   Nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185836380


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   It is obvious, there can not be any other revisions in the metastorage, there's such thing as overdocumenting)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185274821


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java:
##########
@@ -233,9 +244,13 @@ public interface KeyValueStorage extends ManuallyCloseable {
 
     /**
      * Compacts storage (removes tombstones).
-     * TODO: IGNITE-16444 Correct compaction for Meta storage.
+     *
+     * @param compactionWatermark A time threshold for the entry. Only entries that have revisions with timestamp higher or equal to the
+     *     watermark can be removed.
      */
-    void compact();
+    // TODO: IGNITE-16444 Correct compaction for Meta storage.
+    // TODO: IGNITE-19417 Provide low-watermark for compaction.
+    void compact(HybridTimestamp compactionWatermark);

Review Comment:
   Can also be called: low watermark?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -50,11 +55,19 @@ class RocksStorageUtils {
     static byte[] longToBytes(long value) {
         var buffer = new byte[Long.BYTES];
 
-        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+        putLongToBytes(value, buffer, 0);
 
         return buffer;
     }
 
+    static void putLongToBytes(long value, byte[] buffer, int position) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -188,4 +214,18 @@ static long[] getAsLongs(byte[] bytes) {
 
         return result;
     }
+
+    static byte @NotNull [] longsToBytes(long @NotNull [] values, int valuesOffset) {

Review Comment:
   Missing javadoc.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -64,7 +77,21 @@ static byte[] longToBytes(long value) {
     static long bytesToLong(byte[] array) {
         assert array.length == Long.BYTES;
 
-        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+        return bytesToLong(array, 0);
+    }
+
+    static long bytesToLong(byte[] array, int offset) {
+        return (long) LONG_ARRAY_HANDLE.get(array, offset);
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param array Byte array.
+     * @return Long value.
+     */
+    static int bytesToInt(byte[] array, int offset) {

Review Comment:
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */

Review Comment:
   Revision of what?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -156,8 +183,7 @@ static byte[] valueToBytes(byte[] value, long updateCounter) {
      * @param bytes Byte array of longs.
      * @return Array of longs.
      */
-    @NotNull
-    static long[] getAsLongs(byte[] bytes) {
+    static long @NotNull [] getAsLongs(byte[] bytes) {

Review Comment:
   ```suggestion
       static long[] getAsLongs(byte[] bytes) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -929,17 +973,32 @@ public void removeWatch(WatchListener listener) {
     }
 
     @Override
-    public void compact() {
+    public void compact(HybridTimestamp compactionWatermark) {
         rwLock.writeLock().lock();
 
+        byte[] tsBytes = hybridTsToArray(compactionWatermark);
+
         try (WriteBatch batch = new WriteBatch()) {
+            long maxRevision;
+
+            // Find a revision with timestamp lesser or equal to the watermark.
+            try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
+                rocksIterator.seekForPrev(tsBytes);
+
+                RocksUtils.checkIterator(rocksIterator);
+
+                maxRevision = bytesToLong(rocksIterator.value());
+            }
+
+            long maxCompactionRevision = maxRevision;

Review Comment:
   Why another variable?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -929,17 +973,32 @@ public void removeWatch(WatchListener listener) {
     }
 
     @Override
-    public void compact() {
+    public void compact(HybridTimestamp compactionWatermark) {
         rwLock.writeLock().lock();
 
+        byte[] tsBytes = hybridTsToArray(compactionWatermark);
+
         try (WriteBatch batch = new WriteBatch()) {
+            long maxRevision;

Review Comment:
   Please add more comments for what it is.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1034,59 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing all previous revisions lesser or equal to the revision watermark and
+     * deleting the last entry if it is a tombstone.
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param revisionWatermark Maximum revision that can be removed.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long revisionWatermark) throws RocksDBException {
         long lastRev = lastRevision(revs);
 
+        int idxToKeepFrom = 0;
+
         for (int i = 0; i < revs.length - 1; i++) {
-            data.delete(batch, keyToRocksKey(revs[i], key));
+            long rev = revs[i];
+
+            if (rev > revisionWatermark) {
+                break;
+            }
+
+            // This revision is not needed anymore, remove data.
+            data.delete(batch, keyToRocksKey(rev, key));
+
+            idxToKeepFrom++;
         }
 
-        byte[] rocksKey = keyToRocksKey(lastRev, key);
+        // Whether we only have last revision (even if it's lesser or equal to watermark).

Review Comment:
   The code below is hard to read, I believe it can be simplified a bit and documented.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -50,11 +55,19 @@ class RocksStorageUtils {
     static byte[] longToBytes(long value) {
         var buffer = new byte[Long.BYTES];
 
-        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+        putLongToBytes(value, buffer, 0);
 
         return buffer;
     }
 
+    static void putLongToBytes(long value, byte[] buffer, int position) {
+        LONG_ARRAY_HANDLE.set(buffer, position, value);
+    }
+
+    static void putIntToBytes(int value, byte[] buffer, int position) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -188,4 +214,18 @@ static long[] getAsLongs(byte[] bytes) {
 
         return result;
     }
+
+    static byte @NotNull [] longsToBytes(long @NotNull [] values, int valuesOffset) {

Review Comment:
   ```suggestion
       static byte[] longsToBytes(long[] values, int valuesOffset) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java:
##########
@@ -28,7 +28,11 @@ enum StorageColumnFamilyType {
     DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
 
     /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
-    INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+    INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
+
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),

Review Comment:
   Missing javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -64,7 +77,21 @@ static byte[] longToBytes(long value) {
     static long bytesToLong(byte[] array) {
         assert array.length == Long.BYTES;
 
-        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+        return bytesToLong(array, 0);
+    }
+
+    static long bytesToLong(byte[] array, int offset) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Long#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   Revision of what?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java:
##########
@@ -28,7 +28,11 @@ enum StorageColumnFamilyType {
     DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
 
     /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
-    INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+    INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
+
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8));

Review Comment:
   Missing javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -404,14 +440,22 @@ private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr) thro
         queueWatchEvent();
     }
 
+    private byte[] hybridTsToArray(HybridTimestamp ts) {
+        byte[] array = new byte[Long.BYTES];
+
+        putLongToBytes(ts.longValue(), array, 0);
+
+        return array;
+    }

Review Comment:
   ```suggestion
       private static byte[] hybridTsToBytes(HybridTimestamp ts) {
           return longToBytes(ts.longValue());
       }
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);
+        ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(tsToRevOptions);
+
+        Options revToTsOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Long.BYTES);

Review Comment:
   Shouldn't it be `+ Integer.BYTES`?
   Also maybe use: `org.apache.ignite.internal.hlc.HybridTimestamp#HYBRID_TIMESTAMP_SIZE`.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -245,7 +265,7 @@ private void recreateDb() throws RocksDBException {
 
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
-        assert descriptors.size() == 2;
+        assert descriptors.size() == 4;

Review Comment:
   Nice



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);

Review Comment:
   Maybe use: `org.apache.ignite.internal.hlc.HybridTimestamp#HYBRID_TIMESTAMP_SIZE`.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1034,59 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing all previous revisions lesser or equal to the revision watermark and
+     * deleting the last entry if it is a tombstone.
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param revisionWatermark Maximum revision that can be removed.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long revisionWatermark) throws RocksDBException {

Review Comment:
   Why watermark?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;

Review Comment:
   Maybe I'm looking at it wrong, but it seems you are only adding data to this `ColumnFamily` and doing nothing else? Does it need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185445073


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;

Review Comment:
   It is not needed now, but it will be needed soon enough (probably in the next change to metastorage). However, I can remove it and it will not affect anything



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185836380


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   It is obvious, there can not be any other revisions in the metastorage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185845800


##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {

Review Comment:
   I prefer it that way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1187262290


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1020,104 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing revision that are no longer needed.
+     * Last entry with a revision lesser or equal to the {@code minRevisionToKeep} and all consecutive entries will be preserved.
+     * If the first entry to keep is a tombstone, it will be removed.
+     * Example:
+     * <pre>
+     * Example 1:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 6
+     *
+     *     entry1: exists
+     *     entry2: exists
+     *
+     * Example 2:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 7
+     *
+     *     entry1: doesn't exist
+     *     entry2: exists
+     * </pre>
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param minRevisionToKeep Minimum revision that should be kept.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
-        long lastRev = lastRevision(revs);
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long minRevisionToKeep) throws RocksDBException {
+        if (revs.length < 2) {
+            // If we have less than two revisions, there is no point in compaction.
+            return;
+        }
+
+        // Index of the first revision we will be keeping in the array of revisions.
+        int idxToKeepFrom = 0;
+
+        // Whether there is an entry with the minRevisionToKeep.
+        boolean hasMinRevision = false;
+
+        // Traverse revisions, looking for the first revision that needs to be kept.
+        for (long rev : revs) {
+            if (rev >= minRevisionToKeep) {
+                if (rev == minRevisionToKeep) {
+                    hasMinRevision = true;
+                }
+                break;
+            }
+
+            idxToKeepFrom++;
+        }

Review Comment:
   It's just a "can it be done?" question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes merged pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes merged PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1186821360


##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {

Review Comment:
   Ok, but this will make a huge git diff and nothing can be done with it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185772474


##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -60,6 +61,10 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
 
+    private NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();

Review Comment:
   ```suggestion
       private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
   ```



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -60,6 +61,10 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
 
+    private NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
+
+    private NavigableMap<Long, Long> revToTsMap = new TreeMap<>();

Review Comment:
   I think we can get rid of him for now.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/** Compaction test for the simple in-memory implementation of {@link KeyValueStorage}. */
+public class SimpleInMemoryCompactionKeyValueStorageTest extends CompactionKeyValueStorageTest {
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}. */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbCompactionKeyValueStorageTest extends CompactionKeyValueStorageTest {
+    @WorkDirectory
+    private Path workDir;
+
+    /** {@inheritDoc} */

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {
+    protected KeyValueStorage storage;
+
+    /**
+     * Before each.
+     */
+    @BeforeEach
+    public void setUp() {
+        storage = storage();
+
+        storage.start();
+    }
+
+    /**
+     * After each.
+     */
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    /**
+     * Returns key value storage for this test.
+     */
+    abstract KeyValueStorage storage();

Review Comment:
   ```suggestion
       abstract KeyValueStorage createStorage();
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {

Review Comment:
   please rename



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+        HybridTimestamp ts = clock.now();
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Previous value, must be removed due to compaction.
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+    }
+
+    @Test
+    public void test4() {

Review Comment:
   please rename



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+        HybridTimestamp ts = clock.now();
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Previous value, must be removed due to compaction.
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());

Review Comment:
   Shouldn't the value3 have remained? `compactTs` time is less than it.



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -60,6 +61,10 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
 
+    private NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();

Review Comment:
   Missing javadoc.



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -518,23 +528,45 @@ private void compactForKey(
             byte[] key,
             List<Long> revs,
             Map<byte[], List<Long>> compactedKeysIdx,
-            Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx
+            Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx,
+            long maxRevision
     ) {
-        Long lastRev = lastRevision(revs);
+        List<Long> revsToKeep = new ArrayList<>();
+
+        boolean firstToKeep = true;
+
+        for (int i = 0; i < revs.size(); i++) {
+            long rev = revs.get(i);
 
-        NavigableMap<byte[], Value> kv = revsIdx.get(lastRev);
+            if (rev > maxRevision || (i == revs.size() - 1)) {
+                // If this revision is higher than max revision or is the last revision, we may need to keep it.
+                NavigableMap<byte[], Value> kv = revsIdx.get(rev);
 
-        Value lastVal = kv.get(key);
+                Value value = kv.get(key);
 
-        if (!lastVal.tombstone()) {
-            compactedKeysIdx.put(key, listOf(lastRev));
+                if (firstToKeep) {

Review Comment:
   why should we keep the first one?



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -473,13 +481,15 @@ public void removeWatch(WatchListener listener) {
     }
 
     @Override
-    public void compact() {
+    public void compact(HybridTimestamp lowWatermark) {
         synchronized (mux) {
             NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
 
             NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
 
-            keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
+            long maxRevision = tsToRevMap.floorEntry(lowWatermark.longValue()).getValue();

Review Comment:
   It seems that if the map is empty, then there will be an NPE.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {
+    protected KeyValueStorage storage;
+
+    /**
+     * Before each.
+     */

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {

Review Comment:
   please renemae



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -518,23 +528,45 @@ private void compactForKey(
             byte[] key,
             List<Long> revs,
             Map<byte[], List<Long>> compactedKeysIdx,
-            Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx
+            Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx,
+            long maxRevision
     ) {
-        Long lastRev = lastRevision(revs);
+        List<Long> revsToKeep = new ArrayList<>();
+
+        boolean firstToKeep = true;
+
+        for (int i = 0; i < revs.size(); i++) {
+            long rev = revs.get(i);
 
-        NavigableMap<byte[], Value> kv = revsIdx.get(lastRev);
+            if (rev > maxRevision || (i == revs.size() - 1)) {

Review Comment:
   The code below is difficult to read, needs to be rewritten and simplified.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {
+    protected KeyValueStorage storage;
+
+    /**
+     * Before each.
+     */
+    @BeforeEach
+    public void setUp() {
+        storage = storage();
+
+        storage.start();
+    }
+
+    /**
+     * After each.
+     */

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+        HybridTimestamp ts = clock.now();
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());

Review Comment:
   ```suggestion
           storage.put(key, value1, clock.now());
           storage.put(key, value2, clock.now());
           
           HybridTimestamp compactTs = clock.now();
           
           storage.put(key, value3, clock.now());
           storage.put(key, value4, clock.now());
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {

Review Comment:
   please rename



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+        HybridTimestamp ts = clock.now();
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Previous value, must be removed due to compaction.
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+    }
+
+    @Test
+    public void test4() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+
+        HybridTimestamp now1 = clock.now();
+        storage.put(key, value1, now1);
+        HybridTimestamp now2 = clock.now();
+        storage.put(key, value2, now2);
+        HybridTimestamp ts = clock.now();
+        HybridTimestamp now3 = clock.now();
+        storage.remove(key, now3);
+        HybridTimestamp now4 = clock.now();
+        storage.put(key, value3, now4);
+        storage.remove(key, clock.now());

Review Comment:
   ```suggestion
           storage.put(key, value1, clock.now());        
           storage.put(key, value2, clock.now());
           
           HybridTimestamp comactTs = clock.now();
           
           storage.remove(key, clock.now());
   
           storage.put(key, value3, clock.now());
           
           storage.remove(key, clock.now());
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BaseKeyValueStorageTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Abstract test for {@link KeyValueStorage}.
+ */
+public abstract class BaseKeyValueStorageTest {

Review Comment:
   ```suggestion
   public abstract class AbstractKeyValueStorageTest {
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends BaseKeyValueStorageTest {
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void test1() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test2() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void test3() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+        HybridTimestamp ts = clock.now();
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Previous value, must be removed due to compaction.
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+    }
+
+    @Test
+    public void test4() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+
+        HybridTimestamp now1 = clock.now();
+        storage.put(key, value1, now1);
+        HybridTimestamp now2 = clock.now();
+        storage.put(key, value2, now2);
+        HybridTimestamp ts = clock.now();
+        HybridTimestamp now3 = clock.now();
+        storage.remove(key, now3);
+        HybridTimestamp now4 = clock.now();
+        storage.put(key, value3, now4);
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Previous value, must be removed due to compaction.
+        Entry entry4 = storage.get(key, lastRevision - 1);

Review Comment:
   Please check other values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185454765


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   Metastorage, since it is a storage of the metastorage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185454420


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1034,59 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing all previous revisions lesser or equal to the revision watermark and
+     * deleting the last entry if it is a tombstone.
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param revisionWatermark Maximum revision that can be removed.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long revisionWatermark) throws RocksDBException {
         long lastRev = lastRevision(revs);
 
+        int idxToKeepFrom = 0;
+
         for (int i = 0; i < revs.length - 1; i++) {
-            data.delete(batch, keyToRocksKey(revs[i], key));
+            long rev = revs[i];
+
+            if (rev > revisionWatermark) {
+                break;
+            }
+
+            // This revision is not needed anymore, remove data.
+            data.delete(batch, keyToRocksKey(rev, key));
+
+            idxToKeepFrom++;
         }
 
-        byte[] rocksKey = keyToRocksKey(lastRev, key);
+        // Whether we only have last revision (even if it's lesser or equal to watermark).

Review Comment:
   Documented it, but I don't see how the code can be changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185764798


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -156,8 +183,7 @@ static byte[] valueToBytes(byte[] value, long updateCounter) {
      * @param bytes Byte array of longs.
      * @return Array of longs.
      */
-    @NotNull
-    static long[] getAsLongs(byte[] bytes) {
+    static long @NotNull [] getAsLongs(byte[] bytes) {

Review Comment:
   We do not use `@NotNull`:
   https://cwiki.apache.org/confluence/display/IGNITE/Java+Code+Style+Guide#JavaCodeStyleGuide-2.1Commonannotatingrules



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] SammyVimes commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "SammyVimes (via GitHub)" <gi...@apache.org>.
SammyVimes commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185443363


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);
+        ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(tsToRevOptions);
+
+        Options revToTsOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Long.BYTES);

Review Comment:
   We don't actually need it at all, I removed it



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);

Review Comment:
   Already removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1187105941


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -188,4 +210,25 @@ static long[] getAsLongs(byte[] bytes) {
 
         return result;
     }
+
+    /**
+     * Converts an array of {@code long} values to an array of bytes.
+     *
+     * @param values Array of values.
+     * @param valuesOffset Offset in the array of values to start from.
+     * @return Array of bytes.
+     */
+    static byte[] longsToBytes(long[] values, int valuesOffset) {
+        assert valuesOffset < values.length;

Review Comment:
   ```suggestion
           assert valuesOffset < values.length: "off=" + valuesOffset + ", arr.len=" + values.length;
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -64,7 +76,18 @@ static byte[] longToBytes(long value) {
     static long bytesToLong(byte[] array) {
         assert array.length == Long.BYTES;
 
-        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+        return bytesToLong(array, 0);
+    }
+
+    /**
+     * Gets a {@code long} value from a byte array starting from the specified position.
+     *
+     * @param array Byte array.
+     * @param offset Offset to read a value from.
+     * @return {@code long} value.
+     */
+    static long bytesToLong(byte[] array, int offset) {
+        return (long) LONG_ARRAY_HANDLE.get(array, offset);

Review Comment:
   ```suggestion
       static long bytesToLong(byte[] array, int offset) {
           assert offset + Long.BYTES <= array.length: "off=" + offset + ", arr.len=" + array.length;
           
           return (long) LONG_ARRAY_HANDLE.get(array, offset);
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.

Review Comment:
   ```suggestion
           // Previous value, must be removed due to compaction with last tombstone.
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();

Review Comment:
   ```suggestion
           HybridTimestamp compactTs = clock.now();
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);
+        assertTrue(entry1.empty());
+    }

Review Comment:
   Please add `lastRevision - 3`



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1020,104 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing revision that are no longer needed.
+     * Last entry with a revision lesser or equal to the {@code minRevisionToKeep} and all consecutive entries will be preserved.
+     * If the first entry to keep is a tombstone, it will be removed.
+     * Example:
+     * <pre>
+     * Example 1:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 6
+     *
+     *     entry1: exists
+     *     entry2: exists
+     *
+     * Example 2:
+     *     put entry1: revision 5
+     *     put entry2: revision 7
+     *
+     *     do compaction: revision 7
+     *
+     *     entry1: doesn't exist
+     *     entry2: exists
+     * </pre>
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param minRevisionToKeep Minimum revision that should be kept.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
-        long lastRev = lastRevision(revs);
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long minRevisionToKeep) throws RocksDBException {
+        if (revs.length < 2) {
+            // If we have less than two revisions, there is no point in compaction.
+            return;
+        }
+
+        // Index of the first revision we will be keeping in the array of revisions.
+        int idxToKeepFrom = 0;
+
+        // Whether there is an entry with the minRevisionToKeep.
+        boolean hasMinRevision = false;
+
+        // Traverse revisions, looking for the first revision that needs to be kept.
+        for (long rev : revs) {
+            if (rev >= minRevisionToKeep) {
+                if (rev == minRevisionToKeep) {
+                    hasMinRevision = true;
+                }
+                break;
+            }
+
+            idxToKeepFrom++;
+        }

Review Comment:
   I saw similar code in the test implementation, can we transfer it to a separate method?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -71,74 +72,78 @@ class MetaStorageWriteHandler {
     void handleWriteCommand(CommandClosure<WriteCommand> clo) {
         WriteCommand command = clo.command();
 
-        if (command instanceof PutCommand) {
-            PutCommand putCmd = (PutCommand) command;
+        HybridTimestamp safeTime;
 
-            storage.put(putCmd.key(), putCmd.value());
+        if (command instanceof MetaStorageWriteCommand) {
+            safeTime = ((MetaStorageWriteCommand) command).safeTime();
 
-            clo.result(null);
-        } else if (command instanceof GetAndPutCommand) {
-            GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;
+            if (command instanceof PutCommand) {

Review Comment:
   How about moving the processing of these commands into a separate private method with the safe time passed as an argument?
   
   Will there be any problems with exceptions?
   For example, in `org.apache.ignite.internal.table.distributed.raft.PartitionListener#onWrite` there is processing and if you remove it, then there will be freezes.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactEmptyStorage() {
+        storage.compact(clock.now());

Review Comment:
   What is being tested here?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);

Review Comment:
   ```suggestion
           Entry entry2 = storage.get(key, lastRevision - 2);
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactEmptyStorage() {
+        storage.compact(clock.now());
+    }
+
+    @Test
+    public void testCompactionBetweenRevisionsOfOneKey() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.put(key(1), keyValue(1, 0), clock.now());
+
+        HybridTimestamp ts = clock.now();

Review Comment:
   ```suggestion
           HybridTimestamp compactTs = clock.now();
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);

Review Comment:
   ```suggestion
           Entry entry3 = storage.get(key, lastRevision - 1);
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -50,11 +49,24 @@ class RocksStorageUtils {
     static byte[] longToBytes(long value) {
         var buffer = new byte[Long.BYTES];
 
-        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+        putLongToBytes(value, buffer, 0);
 
         return buffer;
     }
 
+    /**
+     * Puts a {@code long} value to the byte array at specified position.
+     *
+     * @param value Value.
+     * @param array Byte array.
+     * @param position Position to put long at.
+     */
+    static void putLongToBytes(long value, byte[] array, int position) {
+        assert position + Long.BYTES <= array.length;

Review Comment:
   ```suggestion
           assert position + Long.BYTES <= array.length: "pos=" + position + ", arr.len=" + array.length;
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);
+        assertTrue(entry3.tombstone());
+
+        Entry entry2 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 2);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactEmptyStorage() {
+        storage.compact(clock.now());
+    }
+
+    @Test
+    public void testCompactionBetweenRevisionsOfOneKey() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.put(key(1), keyValue(1, 0), clock.now());

Review Comment:
   Please move `key(1)` to var, and check it.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();

Review Comment:
   ```suggestion
           HybridTimestamp comactTs = clock.now();
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);

Review Comment:
   ```suggestion
           byte[] value = keyValue(0, 0);
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    public void testCompactionAfterLastRevision() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Latest value, must exist.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertEquals(lastRevision, entry2.revision());
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+
+        storage.put(key, value1, clock.now());
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(clock.now());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry2 = storage.get(key, lastRevision);
+        assertTrue(entry2.empty());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 1);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionBetweenMultipleWrites() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+        byte[] value3 = keyValue(0, 2);
+        byte[] value4 = keyValue(0, 3);
+
+        storage.put(key, value1, clock.now());
+        storage.put(key, value2, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value3, clock.now());
+        storage.put(key, value4, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        Entry entry4 = storage.get(key, lastRevision);
+        assertArrayEquals(value4, entry4.value());
+
+        Entry entry3 = storage.get(key, lastRevision - 1);
+        assertArrayEquals(value3, entry3.value());
+
+        Entry entry2 = storage.get(key, lastRevision - 2);
+        assertArrayEquals(value2, entry2.value());
+
+        // Previous value, must be removed due to compaction.
+        Entry entry1 = storage.get(key, lastRevision - 3);
+        assertTrue(entry1.empty());
+    }
+
+    @Test
+    public void testCompactionAfterTombstoneRemovesTombstone() {
+        byte[] key = key(0);
+        byte[] value1 = keyValue(0, 0);
+        byte[] value2 = keyValue(0, 1);
+
+        storage.put(key, value1, clock.now());
+
+        storage.remove(key, clock.now());
+
+        HybridTimestamp ts = clock.now();
+
+        storage.put(key, value2, clock.now());
+
+        storage.remove(key, clock.now());
+
+        long lastRevision = storage.revision();
+
+        storage.compact(ts);
+
+        // Last operation was remove, so this is a tombstone.
+        Entry entry3 = storage.get(key, lastRevision);

Review Comment:
   ```suggestion
           Entry entry4 = storage.get(key, lastRevision);
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/CompactionKeyValueStorageTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.junit.jupiter.api.Test;
+
+/** Compaction tests. */
+public abstract class CompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {

Review Comment:
   ```suggestion
   public abstract class AbstractCompactionKeyValueStorageTest extends AbstractKeyValueStorageTest {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org