You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/06/22 18:55:46 UTC

[ignite-3] branch main updated: IGNITE-19745 Add a method for local obtaining entries in MetaStorage from lower bound revision to upper bound revision (#2227)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eabf3d24d6 IGNITE-19745 Add a method for local obtaining entries in MetaStorage from lower bound revision to upper bound revision (#2227)
eabf3d24d6 is described below

commit eabf3d24d62cd0c1fde1208609b1b3523fd32167
Author: Sergey Uttsel <ut...@gmail.com>
AuthorDate: Thu Jun 22 21:55:40 2023 +0300

    IGNITE-19745 Add a method for local obtaining entries in MetaStorage from lower bound revision to upper bound revision (#2227)
---
 .../internal/metastorage/MetaStorageManager.java   | 15 ++++
 .../metastorage/impl/MetaStorageManagerImpl.java   | 15 ++++
 .../metastorage/server/KeyValueStorage.java        | 12 +++
 .../server/persistence/RocksDbKeyValueStorage.java | 98 ++++++++++++++++++++++
 .../server/BasicOperationsKeyValueStorageTest.java | 82 ++++++++++++++++++
 .../server/SimpleInMemoryKeyValueStorage.java      | 79 +++++++++++++++++
 6 files changed, 301 insertions(+)

diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 5d1d411f70..c2850211d0 100644
--- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -60,6 +61,20 @@ public interface MetaStorageManager extends IgniteComponent {
      */
     CompletableFuture<Entry> get(ByteArray key, long revUpperBound);
 
+    /**
+     * Returns all entries corresponding to the given key and bounded by given revisions.
+     * All these entries are ordered by revisions and have the same key.
+     * The lower bound and the upper bound are inclusive.
+     * TODO: IGNITE-19735 move this method to another interface for interaction with local KeyValueStorage.
+     *
+     * @param key The key.
+     * @param revLowerBound The lower bound of revision.
+     * @param revUpperBound The upper bound of revision.
+     * @return Entries corresponding to the given key.
+     */
+    @Deprecated
+    List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound);
+
     /**
      * Retrieves entries for given keys.
      */
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 082249a8fa..8daf22341e 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -25,6 +25,7 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.Nullable;
@@ -319,6 +321,19 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
         }
     }
 
+    @Override
+    public List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return storage.get(key, revLowerBound, revUpperBound);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     @Override
     public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
         if (!busyLock.enterBusy()) {
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3da54cb036..cc2edef81b 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -70,6 +70,18 @@ public interface KeyValueStorage extends ManuallyCloseable {
      */
     Entry get(byte[] key, long revUpperBound);
 
+    /**
+     * Returns all entries corresponding to the given key and bounded by given revisions.
+     * All these entries are ordered by revisions and have the same key.
+     * The lower bound and the upper bound are inclusive.
+     *
+     * @param key The key.
+     * @param revLowerBound The lower bound of revision.
+     * @param revUpperBound The upper bound of revision.
+     * @return Entries corresponding to the given key.
+     */
+    List<Entry> get(byte[] key, long revLowerBound, long revUpperBound);
+
     /**
      * Returns all entries corresponding to given keys.
      *
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 15ba8667af..9458da0828 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -46,6 +46,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -574,6 +575,17 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    @Override
+    public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, revLowerBound, revUpperBound);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
         rwLock.readLock().lock();
@@ -1215,6 +1227,52 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
         return doGetValue(key, lastRev);
     }
 
+    /**
+     * Returns all entries corresponding to the given key and bounded by given revisions.
+     * All these entries are ordered by revisions and have the same key.
+     * The lower bound and the upper bound are inclusive.
+     *
+     * @param key The key.
+     * @param revLowerBound The lower bound of revision.
+     * @param revUpperBound The upper bound of revision.
+     * @return Entries corresponding to the given key.
+     */
+    private List<Entry> doGet(byte[] key, long revLowerBound, long revUpperBound) {
+        assert revLowerBound >= 0 : "Invalid arguments: [revLowerBound=" + revLowerBound + ']';
+        assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']';
+        assert revUpperBound >= revLowerBound
+                : "Invalid arguments: [revLowerBound=" + revLowerBound + ", revUpperBound=" + revUpperBound + ']';
+        // TODO: IGNITE-19782 throw CompactedException if revLowerBound is compacted.
+
+        long[] revs;
+
+        try {
+            revs = getRevisions(key);
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(OP_EXECUTION_ERR, e);
+        }
+
+        if (revs.length == 0) {
+            return Collections.emptyList();
+        }
+
+        int firstRevIndex = minRevisionIndex(revs, revLowerBound);
+        int lastRevIndex = maxRevisionIndex(revs, revUpperBound);
+
+        // firstRevIndex can be -1 if minRevisionIndex return -1. lastRevIndex can be -1 if maxRevisionIndex return -1.
+        if (firstRevIndex == -1 || lastRevIndex == -1) {
+            return Collections.emptyList();
+        }
+
+        List<Entry> entries = new ArrayList<>();
+
+        for (int i = firstRevIndex; i <= lastRevIndex; i++) {
+            entries.add(doGetValue(key, revs[i]));
+        }
+
+        return entries;
+    }
+
     /**
      * Get a list of the revisions of the entry corresponding to the key.
      *
@@ -1252,6 +1310,46 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
         return -1;
     }
 
+    /**
+     * Returns index of minimum revision which must be greater or equal to {@code lowerBoundRev}.
+     * If there is no such revision then {@code -1} will be returned.
+     *
+     * @param revs          Revisions list.
+     * @param lowerBoundRev Revision lower bound.
+     * @return Index of minimum revision or {@code -1} if there is no such revision.
+     */
+    private static int minRevisionIndex(long[] revs, long lowerBoundRev) {
+        for (int i = 0; i < revs.length; i++) {
+            long rev = revs[i];
+
+            if (rev >= lowerBoundRev) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Returns index of maximum revision which must be less or equal to {@code upperBoundRev}.
+     * If there is no such revision then {@code -1} will be returned.
+     *
+     * @param revs          Revisions list.
+     * @param upperBoundRev Revision upper bound.
+     * @return Index of maximum revision or {@code -1} if there is no such revision.
+     */
+    private static int maxRevisionIndex(long[] revs, long upperBoundRev) {
+        for (int i = revs.length - 1; i >= 0; i--) {
+            long rev = revs[i];
+
+            if (rev <= upperBoundRev) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
     /**
      * Gets the value by a key and a revision.
      *
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 029980fc28..520f33bdb1 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -190,6 +191,87 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
         assertFalse(key3EntryBounded5.empty());
     }
 
+    @Test
+    void getWithRevisionLowerUpperBound() {
+        byte[] key1 = key(1);
+        byte[] key2 = key(2);
+
+        byte[] val1 = keyValue(1, 1);
+        byte[] val2 = keyValue(1, 2);
+        byte[] val3 = keyValue(2, 3);
+        byte[] val4 = keyValue(1, 4);
+        byte[] val5 = keyValue(1, 5);
+        byte[] val6 = keyValue(2, 6);
+        byte[] val7 = keyValue(2, 7);
+        byte[] val8 = keyValue(1, 8);
+        byte[] val9 = keyValue(1, 9);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        putToMs(key1, val1);
+        putToMs(key1, val2);
+        putToMs(key2, val3);
+        putToMs(key1, val4);
+        putToMs(key1, val5);
+        putToMs(key2, val6);
+        putToMs(key2, val7);
+        putToMs(key1, val8);
+        putToMs(key1, val9);
+
+        removeFromMs(key1);
+
+        assertEquals(10, storage.revision());
+        assertEquals(10, storage.updateCounter());
+
+        // Check that a lower revision and an upper revision are inclusive.
+        // Check that entry with another key is not included in a result list.
+        List<Entry> entries1 = storage.get(key1, 2, 5);
+        List<byte[]> values1 = entries1.stream().map(entry -> entry.value()).collect(Collectors.toList());
+
+        assertEquals(3, entries1.size());
+        assertArrayEquals(val2, values1.get(0));
+        assertArrayEquals(val4, values1.get(1));
+        assertArrayEquals(val5, values1.get(2));
+
+        // Check that entries with another key and revision equals to lower revision and to the upper revision are not inclusive.
+        List<Entry> entries2 = storage.get(key1, 3, 6);
+        List<byte[]> values2 = entries2.stream().map(entry -> entry.value()).collect(Collectors.toList());
+
+        assertEquals(2, entries2.size());
+        assertArrayEquals(val4, values2.get(0));
+        assertArrayEquals(val5, values2.get(1));
+
+        // Get one entry. The lower and the upper revision are equal.
+        List<Entry> entries3 = storage.get(key1, 8, 8);
+        List<byte[]> values3 = entries3.stream().map(entry -> entry.value()).collect(Collectors.toList());
+
+        assertEquals(1, entries3.size());
+        assertArrayEquals(val8, values3.get(0));
+
+        // Try to get entries when the revision range doesn't contain entries with the key.
+        List<Entry> entries4 = storage.get(key1, 6, 7);
+
+        assertTrue(entries4.isEmpty());
+
+        // Try to get entries when the storage doesn't contain entries with specified revisions.
+        List<Entry> entries5 = storage.get(key1, 20, 30);
+
+        assertTrue(entries5.isEmpty());
+
+        // Get a tombstone.
+        List<Entry> entries6 = storage.get(key1, 10, 10);
+
+        assertEquals(1, entries6.size());
+        assertTrue(entries6.get(0).tombstone());
+        assertNull(entries6.get(0).value());
+
+        // Check validation asserts.
+        assertThrows(AssertionError.class, () -> storage.get(key1, -1, 1));
+        assertThrows(AssertionError.class, () -> storage.get(key1, 1, -1));
+        assertThrows(AssertionError.class, () -> storage.get(key1, 2, 1));
+    }
+
     @Test
     void getAll() {
         byte[] key1 = key(1);
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 07e9a913d7..98656f1133 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -181,6 +182,14 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+
+    @Override
+    public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound) {
+        synchronized (mux) {
+            return doGet(key, revLowerBound, revUpperBound);
+        }
+    }
+
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
         synchronized (mux) {
@@ -646,6 +655,36 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return doGetValue(key, lastRev);
     }
 
+    private List<Entry> doGet(byte[] key, long revLowerBound, long revUpperBound) {
+        assert revLowerBound >= 0 : "Invalid arguments: [revLowerBound=" + revLowerBound + ']';
+        assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" + revUpperBound + ']';
+        assert revUpperBound >= revLowerBound
+                : "Invalid arguments: [revLowerBound=" + revLowerBound + ", revUpperBound=" + revUpperBound + ']';
+        // TODO: IGNITE-19782 throw CompactedException if revLowerBound is compacted.
+
+        List<Long> revs = keysIdx.get(key);
+
+        if (revs == null || revs.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        int firstRevIndex = minRevisionIndex(revs, revLowerBound);
+        int lastRevIndex = maxRevisionIndex(revs, revUpperBound);
+
+        // firstRevIndex can be -1 if minRevisionIndex return -1. lastRevIndex can be -1 if maxRevisionIndex return -1.
+        if (firstRevIndex == -1 || lastRevIndex == -1) {
+            return Collections.emptyList();
+        }
+
+        List<Entry> entries = new ArrayList<>();
+
+        for (int i = firstRevIndex; i <= lastRevIndex; i++) {
+            entries.add(doGetValue(key, revs.get(i)));
+        }
+
+        return entries;
+    }
+
     /**
      * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then {@code -1} will be
      * returned.
@@ -668,6 +707,46 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return -1;
     }
 
+    /**
+     * Returns index of minimum revision which must be greater or equal to {@code lowerBoundRev}.
+     * If there is no such revision then {@code -1} will be returned.
+     *
+     * @param revs          Revisions list.
+     * @param lowerBoundRev Revision lower bound.
+     * @return Index of minimum revision or {@code -1} if there is no such revision.
+     */
+    private static int minRevisionIndex(List<Long> revs, long lowerBoundRev) {
+        for (int i = 0; i < revs.size(); i++) {
+            long rev = revs.get(i);
+
+            if (rev >= lowerBoundRev) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    /**
+     * Returns index of maximum revision which must be less or equal to {@code upperBoundRev}.
+     * If there is no such revision then {@code -1} will be returned.
+     *
+     * @param revs          Revisions list.
+     * @param upperBoundRev Revision upper bound.
+     * @return Index of maximum revision or {@code -1} if there is no such revision.
+     */
+    private static int maxRevisionIndex(List<Long> revs, long upperBoundRev) {
+        for (int i = revs.size() - 1; i >= 0; i--) {
+            long rev = revs.get(i);
+
+            if (rev <= upperBoundRev) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
     private Entry doGetValue(byte[] key, long lastRev) {
         if (lastRev == 0) {
             return EntryImpl.empty(key);