You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/05/04 18:05:13 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2019: IGNITE-14734 Implement time-based compaction for meta storage

tkalkirill commented on code in PR #2019:
URL: https://github.com/apache/ignite-3/pull/2019#discussion_r1185274821


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java:
##########
@@ -233,9 +244,13 @@ public interface KeyValueStorage extends ManuallyCloseable {
 
     /**
      * Compacts storage (removes tombstones).
-     * TODO: IGNITE-16444 Correct compaction for Meta storage.
+     *
+     * @param compactionWatermark A time threshold for the entry. Only entries that have revisions with timestamp higher or equal to the
+     *     watermark can be removed.
      */
-    void compact();
+    // TODO: IGNITE-16444 Correct compaction for Meta storage.
+    // TODO: IGNITE-19417 Provide low-watermark for compaction.
+    void compact(HybridTimestamp compactionWatermark);

Review Comment:
   Can also be called: low watermark?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -50,11 +55,19 @@ class RocksStorageUtils {
     static byte[] longToBytes(long value) {
         var buffer = new byte[Long.BYTES];
 
-        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+        putLongToBytes(value, buffer, 0);
 
         return buffer;
     }
 
+    static void putLongToBytes(long value, byte[] buffer, int position) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -188,4 +214,18 @@ static long[] getAsLongs(byte[] bytes) {
 
         return result;
     }
+
+    static byte @NotNull [] longsToBytes(long @NotNull [] values, int valuesOffset) {

Review Comment:
   Missing javadoc.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -64,7 +77,21 @@ static byte[] longToBytes(long value) {
     static long bytesToLong(byte[] array) {
         assert array.length == Long.BYTES;
 
-        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+        return bytesToLong(array, 0);
+    }
+
+    static long bytesToLong(byte[] array, int offset) {
+        return (long) LONG_ARRAY_HANDLE.get(array, offset);
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param array Byte array.
+     * @return Long value.
+     */
+    static int bytesToInt(byte[] array, int offset) {

Review Comment:
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */

Review Comment:
   Revision of what?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -156,8 +183,7 @@ static byte[] valueToBytes(byte[] value, long updateCounter) {
      * @param bytes Byte array of longs.
      * @return Array of longs.
      */
-    @NotNull
-    static long[] getAsLongs(byte[] bytes) {
+    static long @NotNull [] getAsLongs(byte[] bytes) {

Review Comment:
   ```suggestion
       static long[] getAsLongs(byte[] bytes) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -929,17 +973,32 @@ public void removeWatch(WatchListener listener) {
     }
 
     @Override
-    public void compact() {
+    public void compact(HybridTimestamp compactionWatermark) {
         rwLock.writeLock().lock();
 
+        byte[] tsBytes = hybridTsToArray(compactionWatermark);
+
         try (WriteBatch batch = new WriteBatch()) {
+            long maxRevision;
+
+            // Find a revision with timestamp lesser or equal to the watermark.
+            try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
+                rocksIterator.seekForPrev(tsBytes);
+
+                RocksUtils.checkIterator(rocksIterator);
+
+                maxRevision = bytesToLong(rocksIterator.value());
+            }
+
+            long maxCompactionRevision = maxRevision;

Review Comment:
   Why another variable?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -929,17 +973,32 @@ public void removeWatch(WatchListener listener) {
     }
 
     @Override
-    public void compact() {
+    public void compact(HybridTimestamp compactionWatermark) {
         rwLock.writeLock().lock();
 
+        byte[] tsBytes = hybridTsToArray(compactionWatermark);
+
         try (WriteBatch batch = new WriteBatch()) {
+            long maxRevision;

Review Comment:
   Please add more comments for what it is.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1034,59 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing all previous revisions lesser or equal to the revision watermark and
+     * deleting the last entry if it is a tombstone.
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param revisionWatermark Maximum revision that can be removed.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long revisionWatermark) throws RocksDBException {
         long lastRev = lastRevision(revs);
 
+        int idxToKeepFrom = 0;
+
         for (int i = 0; i < revs.length - 1; i++) {
-            data.delete(batch, keyToRocksKey(revs[i], key));
+            long rev = revs[i];
+
+            if (rev > revisionWatermark) {
+                break;
+            }
+
+            // This revision is not needed anymore, remove data.
+            data.delete(batch, keyToRocksKey(rev, key));
+
+            idxToKeepFrom++;
         }
 
-        byte[] rocksKey = keyToRocksKey(lastRev, key);
+        // Whether we only have last revision (even if it's lesser or equal to watermark).

Review Comment:
   The code below is hard to read, I believe it can be simplified a bit and documented.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -50,11 +55,19 @@ class RocksStorageUtils {
     static byte[] longToBytes(long value) {
         var buffer = new byte[Long.BYTES];
 
-        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+        putLongToBytes(value, buffer, 0);
 
         return buffer;
     }
 
+    static void putLongToBytes(long value, byte[] buffer, int position) {
+        LONG_ARRAY_HANDLE.set(buffer, position, value);
+    }
+
+    static void putIntToBytes(int value, byte[] buffer, int position) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Integer#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -188,4 +214,18 @@ static long[] getAsLongs(byte[] bytes) {
 
         return result;
     }
+
+    static byte @NotNull [] longsToBytes(long @NotNull [] values, int valuesOffset) {

Review Comment:
   ```suggestion
       static byte[] longsToBytes(long[] values, int valuesOffset) {
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java:
##########
@@ -28,7 +28,11 @@ enum StorageColumnFamilyType {
     DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
 
     /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
-    INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+    INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
+
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),

Review Comment:
   Missing javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java:
##########
@@ -64,7 +77,21 @@ static byte[] longToBytes(long value) {
     static long bytesToLong(byte[] array) {
         assert array.length == Long.BYTES;
 
-        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+        return bytesToLong(array, 0);
+    }
+
+    static long bytesToLong(byte[] array, int offset) {

Review Comment:
   Missing javadoc.
   Can add `assert` that the array is not less than the `Long#BYTES`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */

Review Comment:
   Revision of what?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java:
##########
@@ -28,7 +28,11 @@ enum StorageColumnFamilyType {
     DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
 
     /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
-    INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+    INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
+
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8));

Review Comment:
   Missing javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -404,14 +440,22 @@ private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr) thro
         queueWatchEvent();
     }
 
+    private byte[] hybridTsToArray(HybridTimestamp ts) {
+        byte[] array = new byte[Long.BYTES];
+
+        putLongToBytes(ts.longValue(), array, 0);
+
+        return array;
+    }

Review Comment:
   ```suggestion
       private static byte[] hybridTsToBytes(HybridTimestamp ts) {
           return longToBytes(ts.longValue());
       }
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);
+        ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(tsToRevOptions);
+
+        Options revToTsOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Long.BYTES);

Review Comment:
   Shouldn't it be `+ Integer.BYTES`?
   Also maybe use: `org.apache.ignite.internal.hlc.HybridTimestamp#HYBRID_TIMESTAMP_SIZE`.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -245,7 +265,7 @@ private void recreateDb() throws RocksDBException {
 
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
-        assert descriptors.size() == 2;
+        assert descriptors.size() == 4;

Review Comment:
   Nice



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -231,12 +242,21 @@ private static List<ColumnFamilyDescriptor> cfDescriptors() {
         ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
         Options indexOptions = new Options().setCreateIfMissing(true);
-
         ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
 
+        Options tsToRevOptions = new Options().setCreateIfMissing(true)
+                .useFixedLengthPrefixExtractor(Integer.BYTES + Long.BYTES);

Review Comment:
   Maybe use: `org.apache.ignite.internal.hlc.HybridTimestamp#HYBRID_TIMESTAMP_SIZE`.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -975,30 +1034,59 @@ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev,
     }
 
     /**
-     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is a tombstone.
+     * Compacts all entries by the given key, removing all previous revisions lesser or equal to the revision watermark and
+     * deleting the last entry if it is a tombstone.
      *
      * @param batch Write batch.
      * @param key   Target key.
      * @param revs  Revisions.
+     * @param revisionWatermark Maximum revision that can be removed.
      * @throws RocksDBException If failed.
      */
-    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long revisionWatermark) throws RocksDBException {

Review Comment:
   Why watermark?)



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -150,6 +155,12 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
     /** Index column family. */
     private volatile ColumnFamily index;
 
+    /** Timestamp to revision mapping column family. */
+    private volatile ColumnFamily tsToRevision;
+
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;

Review Comment:
   Maybe I'm looking at it wrong, but it seems you are only adding data to this `ColumnFamily` and doing nothing else? Does it need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org